diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 51ae65ff9a..2460be70b4 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -9,7 +9,7 @@ {.used.} -import std/[options, deques, sequtils, enumerate, algorithm] +import std/[options, deques, sequtils, enumerate, algorithm, os] import stew/byteutils import ../../libp2p/builders import ../../libp2p/errors @@ -718,15 +718,19 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = + proc setupTest(maxDurationInNonPriorityQueue1: Opt[Duration] = Opt.none(Duration)): + Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = let nodes = generateNodes(2, gossip = true, verifySignature = false) discard await allFinished( nodes[0].switch.start(), nodes[1].switch.start() ) + var gossip0: GossipSub = GossipSub(nodes[0]) + var gossip1: GossipSub = GossipSub(nodes[1]) - await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs) + gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1 + await gossip1.switch.connect(gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs) var receivedMessages = new(HashSet[seq[byte]]) @@ -736,12 +740,10 @@ suite "GossipSub internal": proc handlerB(topic: string, data: seq[byte]) {.async.} = discard - nodes[0].subscribe("foobar", handlerA) - nodes[1].subscribe("foobar", handlerB) + gossip0.subscribe("foobar", handlerA) + gossip1.subscribe("foobar", handlerB) await waitSubGraph(nodes, "foobar") - var gossip0: GossipSub = GossipSub(nodes[0]) - var gossip1: GossipSub = GossipSub(nodes[1]) return (gossip0, gossip1, receivedMessages) @@ -844,3 +846,18 @@ suite "GossipSub internal": check receivedMessages[].len == 1 await teardownTest(gossip0, gossip1) + + asyncTest "e2e - drop msg if it is in the non-priority queue for too long": + # This test checks if two messages, both below the maxSize, are correctly processed and sent. + # Expected: Both messages should be received. + let maxDurationInNonPriorityQueueGossip1 = 1.millis + let (gossip0, gossip1, receivedMessages) = await setupTest(Opt.some(maxDurationInNonPriorityQueueGossip1)) + + let topic = "foobar" + gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), false) + sleep(2) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1 + gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](36))]), false) + await sleepAsync(10.milliseconds) # wait for the messages to be processed + check: receivedMessages[].len == 1 # only the second message should be received + + await teardownTest(gossip0, gossip1) \ No newline at end of file