Skip to content

Commit

Permalink
fix: boot node connections
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger committed Oct 3, 2023
1 parent c879099 commit 08012d0
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 37 deletions.
8 changes: 6 additions & 2 deletions examples/ping-pong/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn main() {
None,
Some(true),
// Example ENR address
Some(vec![String::from("enr:-JK4QBcfVXu2YDeSKdjF2xE5EDM5f5E_1Akpkv_yw_byn1adESxDXVLVjapjDvS_ujx6MgWDu9hqO_Az_CbKLJ8azbMBgmlkgnY0gmlwhAVOUWOJc2VjcDI1NmsxoQOUZIqKLk5xkiH0RAFaMGrziGeGxypJ03kOod1-7Pum3oN0Y3CCfJyDdWRwgiMohXdha3UyDQ")]),
Some(vec![String::from("enr:-P-4QJI8tS1WTdIQxq_yIrD05oIIW1Xg-tm_qfP0CHfJGnp9dfr6ttQJmHwTNxGEl4Le8Q7YHcmi-kXTtphxFysS11oBgmlkgnY0gmlwhLymh5GKbXVsdGlhZGRyc7hgAC02KG5vZGUtMDEuZG8tYW1zMy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGdl8ALzYobm9kZS0wMS5kby1hbXMzLndha3V2Mi5wcm9kLnN0YXR1c2ltLm5ldAYfQN4DiXNlY3AyNTZrMaEDbl1X_zJIw3EAJGtmHMVn4Z2xhpSoUaP5ElsHKCv7hlWDdGNwgnZfg3VkcIIjKIV3YWt1Mg8")]),
None,
)
.await
Expand Down Expand Up @@ -193,6 +193,8 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc<AtomicBool>) {
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
} else {
info!("Ping message sent successfully")
};
// agent.send_message(msg).await;
} else {
Expand All @@ -217,7 +219,9 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc<AtomicBool>) {
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
} else {
info!("Pong message sent successfully")
}
};
}

Expand Down
9 changes: 1 addition & 8 deletions src/graphcast_agent/message_typing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,7 @@ impl<
.map_err(WakuHandlingError::RetrievePeersError)
.unwrap_or_default()
.iter()
.filter(|&peer| {
// Filter out local peer_id to prevent self dial
peer.peer_id().as_str()
!= node_handle
.peer_id()
.expect("Failed to find local node's peer id")
.as_str()
})
.filter(|&peer| peer.connected())
.map(|peer: &WakuPeerData| {
// Filter subscribe to all other peers
node_handle
Expand Down
2 changes: 2 additions & 0 deletions src/graphcast_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ pub fn register_handler(
}
}
};

trace!("Registering handler");
waku_set_event_callback(handle_async);
Ok(())
}
Expand Down
56 changes: 29 additions & 27 deletions src/graphcast_agent/waku_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,6 @@ pub fn filter_peer_subscriptions(
.peers()
.map_err(WakuHandlingError::RetrievePeersError)?
.iter()
.filter(|&peer| {
// Filter out local peer_id to prevent self dial
peer.peer_id().as_str()
!= node_handle
.peer_id()
.expect("Failed to find local node's peer id")
.as_str()
})
.map(|peer: &WakuPeerData| {
// subscribe to all other peers
let filter_res = node_handle.filter_subscribe(
Expand Down Expand Up @@ -166,7 +158,7 @@ fn node_config(
"PANIC" => WakuLogLevel::Panic,
_ => WakuLogLevel::Warn,
},
Err(_) => WakuLogLevel::Error,
Err(_) => WakuLogLevel::Panic,
};

let gossipsub_params = GossipSubParams {
Expand Down Expand Up @@ -496,25 +488,35 @@ pub fn peers_data(

/// Check for peer connectivity, try to reconnect if there are disconnected peers
pub fn network_check(node_handle: &WakuNodeHandle<Running>) -> Result<(), WakuHandlingError> {
peers_data(node_handle)?
.iter()
// Get unconnected peers and try to reconnect
.filter(|&peer| !peer.connected())
.map(|peer: &WakuPeerData| {
debug!(
peer = tracing::field::debug(&peer),
"Disconnected peer data"
);
node_handle.connect_peer_with_id(peer.peer_id(), None)
})
.for_each(|res| {
if let Err(e) = res {
debug!(
error = tracing::field::debug(&e),
"Could not connect to peer"
);
let peers = peers_data(node_handle)?;

for peer in peers.iter() {
let contains_filter = peer
.protocols()
.iter()
.any(|p| p == "/vac/waku/filter/2.0.0-beta1");
let contains_lightpush = peer
.protocols()
.iter()
.any(|p| p == "/vac/waku/lightpush/2.0.0-beta1");
let contains_relay = peer
.protocols()
.iter()
.any(|p| p == "/vac/waku/relay/2.0.0");

if contains_filter && contains_lightpush && contains_relay {
if !peer.connected() {
if let Err(e) = node_handle.connect_peer_with_id(peer.peer_id(), None) {
debug!(
error = tracing::field::debug(&e),
"Could not connect to peer"
);
}
}
});
} else {
node_handle.disconnect_peer_with_id(peer.peer_id()).unwrap();
}
}
Ok(())
}

Expand Down

0 comments on commit 08012d0

Please sign in to comment.