From 3fb83b86725fcc12dcbc703190b46ea0d895cfa2 Mon Sep 17 00:00:00 2001 From: ufarooqstatus Date: Mon, 19 Feb 2024 14:21:39 +0500 Subject: [PATCH] Remove peers from relay (ToSendPeers) list, from which message or idontwant is received. We delay this action till sendMsg to increase such peers --- libp2p/protocols/pubsub/gossipsub.nim | 5 +++- libp2p/protocols/pubsub/pubsub.nim | 11 ++++---- libp2p/protocols/pubsub/pubsubpeer.nim | 36 +++++++++++++++++++------- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 59ad4376da..06175964bc 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -381,7 +381,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, validMsgId = msgId) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -497,6 +497,9 @@ method rpcHandler*(g: GossipSub, libp2p_gossipsub_duplicate.inc() + if msg.data.len > msgId.len * 10: #Dont relay to the peers from which we already received (We just do it for large messages) + peer.heDontWants[^1].incl(msgId) + # onto the next message continue diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 0cf736797e..1adb1774a1 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -138,7 +138,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} = +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, id: MessageId = @[]) {.raises: [].} = ## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network. ## ## Parameters: @@ -150,13 +150,14 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai ## priority messages have been sent. trace "sending pubsub message to peer", peer, msg = shortLog(msg) - asyncSpawn peer.send(msg, p.anonymize, isHighPriority) + asyncSpawn peer.send(msg, p.anonymize, isHighPriority, id) proc broadcast*( p: PubSub, sendPeers: auto, # Iteratble[PubSubPeer] msg: RPCMsg, - isHighPriority: bool) {.raises: [].} = + isHighPriority: bool, + validMsgId: MessageId = @[]) {.raises: [].} = ## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network. ## ## Parameters: @@ -211,12 +212,12 @@ proc broadcast*( if anyIt(sendPeers, it.hasObservers): for peer in sendPeers: - p.send(peer, msg, isHighPriority) + p.send(peer, msg, isHighPriority, validMsgId) else: # Fast path that only encodes message once let encoded = encodeRpcMsg(msg, p.anonymize) for peer in sendPeers: - asyncSpawn peer.sendEncoded(encoded, isHighPriority) + asyncSpawn peer.sendEncoded(encoded, isHighPriority, validMsgId) proc sendSubs*(p: PubSub, peer: PubSubPeer, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index d74ef133f2..eef6b34a75 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -21,6 +21,8 @@ import rpc/[messages, message, protobuf], ../../protobuf/minprotobuf, ../../utility +#import gossipsub/libp2p_gossipsub_staggerDontWantSave + export peerid, connection, deques logScope: @@ -52,11 +54,15 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + MessageWithId = object + message: seq[byte] + msgId: MessageId + RpcMessageQueue* = ref object # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] # Queue for lower-priority messages, like "IWANT" replies and relay messages. - nonPriorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[MessageWithId] # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] @@ -95,6 +101,10 @@ when defined(libp2p_agents_metrics): #so we have to read the parents short agent.. p.sendConn.getWrapped().shortAgent +proc newMessageWithId(msg: seq[byte], id: MessageId): MessageWithId = + result.message = msg + result.msgId = id + proc getAgent*(peer: PubSubPeer): string = return when defined(libp2p_agents_metrics): @@ -256,7 +266,7 @@ proc clearSendPriorityQueue(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) discard p.rpcmessagequeue.sendPriorityQueue.popFirst() -proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = +proc sendMsg(p: PubSubPeer, msg: seq[byte], msgId: MessageId) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will # complete this even if the sendConn setup failed @@ -269,6 +279,12 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) + if msgId.len > 0: + for dontWants in p.heDontWants: + if msgId in dontWants: + #libp2p_gossipsub_staggerDontWantSave.inc() + trace "Skipped sending msg/dontwant received from peer", conn, encoded = shortLog(msg) + return try: await conn.writeLp(msg) trace "sent pubsub message to remote", conn @@ -281,7 +297,7 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection -proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} = +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} = ## Asynchronously sends an encoded message to a specified `PubSubPeer`. ## ## Parameters: @@ -302,13 +318,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} if isHighPriority: p.clearSendPriorityQueue() - let f = p.sendMsg(msg) + let f = p.sendMsg(msg, validMsgId) if not f.finished: p.rpcmessagequeue.sendPriorityQueue.addLast(f) when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + await p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithId(msg, validMsgId)) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -348,7 +364,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} = ## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization. ## ## Parameters: @@ -377,11 +393,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {. if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - await p.sendEncoded(encodedSplitMsg, isHighPriority) + await p.sendEncoded(encodedSplitMsg, isHighPriority, validMsgId) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - await p.sendEncoded(encoded, isHighPriority) + await p.sendEncoded(encoded, isHighPriority, validMsgId) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -403,7 +419,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - await p.sendMsg(msg) + await p.sendMsg(msg.message, msg.msgId) proc startSendNonPriorityTask(p: PubSubPeer) = debug "starting sendNonPriorityTask", p @@ -424,7 +440,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = proc new(T: typedesc[RpcMessageQueue]): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[MessageWithId](), ) proc new*(