From 7d477732ad3ca98a57230b7f654a187b09ac401f Mon Sep 17 00:00:00 2001 From: ufarooqstatus Date: Mon, 20 May 2024 20:00:30 +0500 Subject: [PATCH] Staggered message sending with semaphores, with late elimination of peers (even during transmissions) based on idontwants --- libp2p/protocols/pubsub/gossipsub.nim | 34 +++++++-- libp2p/protocols/pubsub/pubsub.nim | 21 ++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 100 ++++++++++++++++++------- 3 files changed, 116 insertions(+), 39 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1a8be68a37..8c7ee8c487 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -418,10 +418,19 @@ proc validateAndRelay(g: GossipSub, break toSendPeers.excl(seenPeers) + #We first send to the outbound peers to avoid peers sending same message to each other + var outboundPeers: seq[PubSubPeer] + for mpeer in toSendPeers: + if mpeer.outbound(): + outboundPeers.add(mpeer) + if outboundPeers.len > 0: + g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId)) + toSendPeers.excl(outboundPeers.toHashSet) + # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) - trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, some(saltedId)) + trace "forwarded message to peers", peers = toSendPeers.len + outboundPeers.len, msgId, peer if g.knownTopics.contains(topic): libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) @@ -529,6 +538,11 @@ method rpcHandler*(g: GossipSub, libp2p_gossipsub_duplicate.inc() + #Dont relay to the peers from which we already received + #We do it for large messages only + if msg.data.len > msgId.len * 10: + peer.heDontWants[^1].incl(msgIdSalted) + # onto the next message continue @@ -700,15 +714,23 @@ method publish*(g: GossipSub, g.mcache.put(msgId, msg) + #We first send to the outbound peers + var outboundPeers: seq[PubSubPeer] + for mpeer in peers: + if mpeer.outbound(): + outboundPeers.add(mpeer) + if outboundPeers.len > 0: + g.broadcast(outboundPeers, RPCMsg(messages: @[msg]), isHighPriority = true) + peers.excl(outboundPeers.toHashSet) g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) if g.knownTopics.contains(topic): - libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) + libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = [topic]) else: - libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) + libp2p_pubsub_messages_published.inc( (peers.len + outboundPeers.len).int64, labelValues = ["generic"]) - trace "Published message to peers", peers=peers.len - return peers.len + trace "Published message to peers", peers=peers.len + outboundPeers.len + return (peers.len + outboundPeers.len) proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = if id notin g.peers: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 9834b369ee..bfadaa3584 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -28,7 +28,8 @@ import ./errors as pubsub_errors, ../../peerid, ../../peerinfo, ../../errors, - ../../utility + ../../utility, + ../../utils/semaphore import stew/results export results @@ -79,6 +80,9 @@ declarePublicCounter(libp2p_pubsub_received_ihave, "pubsub broadcast ihave", lab declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", labels = ["topic"]) declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"]) +const + DefaultMaxSimultaneousTx* = 2 + type InitializationError* = object of LPError @@ -127,6 +131,7 @@ type rng*: ref HmacDrbgContext knownTopics*: HashSet[string] + semTxLimit: AsyncSemaphore method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = ## handle peer disconnects @@ -137,7 +142,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} = +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} = ## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network. ## ## Parameters: @@ -149,13 +154,14 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai ## priority messages have been sent. trace "sending pubsub message to peer", peer, msg = shortLog(msg) - peer.send(msg, p.anonymize, isHighPriority) + peer.send(msg, p.anonymize, isHighPriority, saltedId) proc broadcast*( p: PubSub, sendPeers: auto, # Iteratble[PubSubPeer] msg: RPCMsg, - isHighPriority: bool) {.raises: [].} = + isHighPriority: bool, + sid: Option[SaltedId] = none(SaltedId)) {.raises: [].} = ## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network. ## ## Parameters: @@ -210,12 +216,12 @@ proc broadcast*( if anyIt(sendPeers, it.hasObservers): for peer in sendPeers: - p.send(peer, msg, isHighPriority) + p.send(peer, msg, isHighPriority, sid) else: # Fast path that only encodes message once let encoded = encodeRpcMsg(msg, p.anonymize) for peer in sendPeers: - asyncSpawn peer.sendEncoded(encoded, isHighPriority) + asyncSpawn peer.sendEncoded(encoded, isHighPriority, sid) proc sendSubs*(p: PubSub, peer: PubSubPeer, @@ -310,7 +316,7 @@ method getOrCreatePeer*( p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize) + let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, addr p.semTxLimit) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer @@ -509,6 +515,7 @@ method initPubSub*(p: PubSub) p.observers = new(seq[PubSubObserver]) if p.msgIdProvider == nil: p.msgIdProvider = defaultMsgIdProvider + p.semTxLimit = newAsyncSemaphore(DefaultMaxSimultaneousTx) method addValidator*(p: PubSub, topic: varargs[string], diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index d369a32dbb..6459e1c334 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -19,7 +19,9 @@ import rpc/[messages, message, protobuf], ../../stream/connection, ../../crypto/crypto, ../../protobuf/minprotobuf, - ../../utility + ../../utility, + ../../utils/semaphore +import atomics export peerid, connection, deques @@ -37,6 +39,12 @@ when defined(pubsubpeer_queue_metrics): declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity") +var + libp2p_gossipsub_staggerDontWantSave2: Atomic[int] + libp2p_gossipsub_staggerDontWantSave3: Atomic[int] + +export libp2p_gossipsub_staggerDontWantSave2, libp2p_gossipsub_staggerDontWantSave3 + const DefaultMaxNumElementsInNonPriorityQueue* = 1024 @@ -59,11 +67,15 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + MessageWithSaltedId = object + message: seq[byte] + sid: SaltedId + RpcMessageQueue* = ref object # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] # Queue for lower-priority messages, like "IWANT" replies and relay messages. - nonPriorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[MessageWithSaltedId] # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] @@ -91,6 +103,7 @@ type behaviourPenalty*: float64 # the eventual penalty score overheadRateLimitOpt*: Opt[TokenBucket] + semTxLimit: ptr AsyncSemaphore #Control Max simultaneous transmissions to speed up indivisual receptions rpcmessagequeue: RpcMessageQueue maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue. disconnected: bool @@ -107,6 +120,10 @@ when defined(libp2p_agents_metrics): #so we have to read the parents short agent.. p.sendConn.getWrapped().shortAgent +proc newMessageWithSaltedId(msg: seq[byte], saltedId: SaltedId): MessageWithSaltedId = + result.message = msg + result.sid = saltedId + proc getAgent*(peer: PubSubPeer): string = return when defined(libp2p_agents_metrics): @@ -311,24 +328,49 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} = debug "No send connection", p, msg = shortLog(msg) return - trace "sending encoded msg to peer", conn, encoded = shortLog(msg) - await sendMsgContinue(conn, conn.writeLp(msg)) - -proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] = +proc sendMsg(p: PubSubPeer, msg: seq[byte], saltedId: Option[SaltedId] = none(SaltedId)): Future[void] {.async.}= + if p.sendConn == nil or p.sendConn.closed(): + await sendMsgSlow(p, msg) + if p.sendConn != nil and not p.sendConn.closed(): - # Fast path that avoids copying msg (which happens for {.async.}) let conn = p.sendConn - trace "sending encoded msg to peer", conn, encoded = shortLog(msg) - let f = conn.writeLp(msg) - if not f.completed(): - sendMsgContinue(conn, f) - else: - f - else: - sendMsgSlow(p, msg) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[void] = + if msg.len < 2000: + try: + let f = conn.writeLp(msg) + await f + except: + await conn.close() + else: + await p.semTxLimit[].acquire() + #We may have received DontWant for the message + if saltedId.isSome: + for heDontWant in p.heDontWants: + if saltedId.get in heDontWant: + p.semTxLimit[].release() + atomicInc(libp2p_gossipsub_staggerDontWantSave2) + return + try: + let f = conn.writeLp(msg) + let turns = (msg.len div 100_000) + 1 + for i in 1..turns: + await f or sleepAsync(200.milliseconds) #sleep time should be adaptive to the peer bandwidth + if not f.completed and saltedId.isSome: + for heDontWant in p.heDontWants: + if saltedId.get in heDontWant: + atomicInc(libp2p_gossipsub_staggerDontWantSave3) + break + if not f.completed: + await f.cancelAndWait() + #asyncSpawn sendMsgContinue(conn, f) + p.semTxLimit[].release() + + except LPStreamError as exc: + p.semTxLimit[].release() + await conn.close() + +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)): Future[void] = ## Asynchronously sends an encoded message to a specified `PubSubPeer`. ## ## Parameters: @@ -354,7 +396,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len Future[void].completed() elif isHighPriority or emptyQueues: - let f = p.sendMsg(msg) + let f = p.sendMsg(msg, saltedId) if not f.finished: p.rpcmessagequeue.sendPriorityQueue.addLast(f) when defined(pubsubpeer_queue_metrics): @@ -369,10 +411,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v else: Future[void].completed() else: - let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg) - when defined(pubsubpeer_queue_metrics): - libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) - f + if not saltedId.isSome: + Future[void].completed() + else: + let f = p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithSaltedId(msg, saltedId.get)) + when defined(pubsubpeer_queue_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + f iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. @@ -409,7 +454,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, saltedId: Option[SaltedId] = none(SaltedId)) {.raises: [].} = ## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization. ## ## Parameters: @@ -438,11 +483,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {. if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority) + asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority, saltedId) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded, isHighPriority) + asyncSpawn p.sendEncoded(encoded, isHighPriority, saltedId) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -467,7 +512,8 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = discard await race(p.rpcmessagequeue.sendPriorityQueue[^1]) when defined(pubsubpeer_queue_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - await p.sendMsg(msg) + await p.sendMsg(msg.message, some(msg.sid)) + #asyncSpawn p.sendMsg(msg.message, some(msg.sid)) proc startSendNonPriorityTask(p: PubSubPeer) = debug "starting sendNonPriorityTask", p @@ -489,7 +535,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = proc new(T: typedesc[RpcMessageQueue]): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[seq[byte]]() + nonPriorityQueue: newAsyncQueue[MessageWithSaltedId]() ) proc new*( @@ -499,6 +545,7 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, + sem: ptr AsyncSemaphore, maxNumElementsInNonPriorityQueue: int = DefaultMaxNumElementsInNonPriorityQueue, overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = @@ -516,3 +563,4 @@ proc new*( result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[SaltedId])) result.startSendNonPriorityTask() + result.semTxLimit = sem