From f8946b8263488378e2ac5ae268121ab189638d55 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 27 Sep 2024 19:35:18 +0300 Subject: [PATCH] fix: rejecting excess relay connections (#3065) --- waku/node/peer_manager/peer_manager.nim | 107 +++++++++++------------- 1 file changed, 50 insertions(+), 57 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 12c6a084f7..6e5a9c6976 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -399,6 +399,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) pm.peerStore.delete(peerId) +proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = + ## Returns the peerIds of physical connections (in and out) + ## containing at least one stream with the given protocol. + + var inPeers: seq[PeerId] + var outPeers: seq[PeerId] + + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + let streams = peerConn.getStreams() + if streams.anyIt(it.protocol == protocol): + if peerConn.connection.transportDir == Direction.In: + inPeers.add(peerId) + elif peerConn.connection.transportDir == Direction.Out: + outPeers.add(peerId) + + return (inPeers, outPeers) + # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: @@ -412,6 +430,17 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = direction = if event.initiator: Outbound else: Inbound connectedness = Connected + ## Check max allowed in-relay peers + let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0] + if inRelayPeers.len > pm.inRelayPeersTarget and + pm.peerStore.hasPeer(peerId, WakuRelayCodec): + debug "disconnecting relay peer because reached max num in-relay peers", + peerId = peerId, + inRelayPeers = inRelayPeers.len, + inRelayPeersTarget = pm.inRelayPeersTarget + await pm.switch.disconnect(peerId) + + ## Apply max ip colocation limit if (let ip = pm.getPeerIp(peerId); ip.isSome()): pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) @@ -494,7 +523,7 @@ proc new*( error "Max backoff time can't be over 1 week", maxBackoff = backoff raise newException(Defect, "Max backoff time can't be over 1 week") - let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10) + let outRelayPeersTarget = maxRelayPeersValue div 3 let pm = PeerManager( switch: switch, @@ -560,46 +589,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str pm.addPeer(remotePeerInfo) -proc reconnectPeers*( - pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0) -) {.async.} = - ## Reconnect to peers registered for this protocol. This will update connectedness. - ## Especially useful to resume connections from persistent storage after a restart. - - trace "Reconnecting peers", proto = proto - - # Proto is not persisted, we need to iterate over all peers. - for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): - # Check that the peer can be connected - if peerInfo.connectedness == CannotConnect: - error "Not reconnecting to unreachable or non-existing peer", - peerId = peerInfo.peerId - continue - - # Respect optional backoff period where applicable. - let - # TODO: Add method to peerStore (eg isBackoffExpired()) - disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert - currentTime = Moment.init(getTime().toUnix, Second) - # Current time comparable to persisted value - backoffTime = disconnectTime + backoff - currentTime - # Consider time elapsed since last disconnect - - trace "Respecting backoff", - backoff = backoff, - disconnectTime = disconnectTime, - currentTime = currentTime, - backoffTime = backoffTime - - # TODO: This blocks the whole function. Try to connect to another peer in the meantime. - if backoffTime > ZeroDuration: - trace "Backing off before reconnect...", - peerId = peerInfo.peerId, backoffTime = backoffTime - # We disconnected recently and still need to wait for a backoff period before connecting - await sleepAsync(backoffTime) - - discard await pm.connectRelay(peerInfo) - #################### # Dialer interface # #################### @@ -685,23 +674,29 @@ proc connectToNodes*( # later. await sleepAsync(chronos.seconds(5)) -proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = - ## Returns the peerIds of physical connections (in and out) - ## containing at least one stream with the given protocol. +proc reconnectPeers*( + pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0) +) {.async.} = + ## Reconnect to peers registered for this protocol. This will update connectedness. + ## Especially useful to resume connections from persistent storage after a restart. - var inPeers: seq[PeerId] - var outPeers: seq[PeerId] + debug "Reconnecting peers", proto = proto - for peerId, muxers in pm.switch.connManager.getConnections(): - for peerConn in muxers: - let streams = peerConn.getStreams() - if streams.anyIt(it.protocol == protocol): - if peerConn.connection.transportDir == Direction.In: - inPeers.add(peerId) - elif peerConn.connection.transportDir == Direction.Out: - outPeers.add(peerId) + # Proto is not persisted, we need to iterate over all peers. + for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): + # Check that the peer can be connected + if peerInfo.connectedness == CannotConnect: + error "Not reconnecting to unreachable or non-existing peer", + peerId = peerInfo.peerId + continue - return (inPeers, outPeers) + if backoffTime > ZeroDuration: + debug "Backing off before reconnect", + peerId = peerInfo.peerId, backoffTime = backoffTime + # We disconnected recently and still need to wait for a backoff period before connecting + await sleepAsync(backoffTime) + + await pm.connectToNodes(@[peerInfo]) proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = var @@ -730,9 +725,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = proc connectToRelayPeers*(pm: PeerManager) {.async.} = var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let maxConnections = pm.switch.connManager.inSema.size let totalRelayPeers = inRelayPeers.len + outRelayPeers.len - let inPeersTarget = maxConnections - pm.outRelayPeersTarget if inRelayPeers.len > pm.inRelayPeersTarget: await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)