Skip to content

Commit

Permalink
test & fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Dec 3, 2024
1 parent ad9dc41 commit b679a54
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 61 deletions.
88 changes: 44 additions & 44 deletions tests/test_waku_rendezvous.nim
Original file line number Diff line number Diff line change
@@ -1,51 +1,51 @@
{.used.}

import chronos, testutils/unittests, libp2p/builders, libp2p/protocols/rendezvous
import chronos, testutils/unittests, libp2p/builders

import waku/node/waku_switch, ./testlib/common, ./testlib/wakucore

proc newRendezvousClientSwitch(rdv: RendezVous): Switch =
SwitchBuilder
.new()
.withRng(rng())
.withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()])
.withTcpTransport()
.withMplex()
.withNoise()
.withRendezVous(rdv)
.build()
import
waku/waku_core/peers,
waku/node/waku_node,
waku/node/peer_manager/peer_manager,
waku/waku_rendezvous/protocol,
./testlib/[wakucore, wakunode]

procSuite "Waku Rendezvous":
asyncTest "Waku Switch uses Rendezvous":
## Setup

asyncTest "Simple remote test":
let
wakuClient = RendezVous.new()
sourceClient = RendezVous.new()
destClient = RendezVous.new()
wakuSwitch = newRendezvousClientSwitch(wakuClient) #rendezvous point
sourceSwitch = newRendezvousClientSwitch(sourceClient) #client
destSwitch = newRendezvousClientSwitch(destClient) #client

# Setup client rendezvous
wakuClient.setup(wakuSwitch)
sourceClient.setup(sourceSwitch)
destClient.setup(destSwitch)

await allFutures(wakuSwitch.start(), sourceSwitch.start(), destSwitch.start())

# Connect clients to the rendezvous point
await sourceSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs)
await destSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs)

let res0 = await sourceClient.request("empty")
check res0.len == 0

# Check that source client gets peer info of dest client from rendezvous point
await sourceClient.advertise("foo")
let res1 = await destClient.request("foo")
check:
res1.len == 1
res1[0] == sourceSwitch.peerInfo.signedPeerRecord.data
clusterId = 10.uint16
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
)
node2 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
)

# Start nodes
await allFutures([node1.start(), node2.start()])

let remotePeerInfo1 = node1.switch.peerInfo.toRemotePeerInfo()
let remotePeerInfo2 = node2.switch.peerInfo.toRemotePeerInfo()

node2.peerManager.addPeer(remotePeerInfo1)

await allFutures(wakuSwitch.stop(), sourceSwitch.stop(), destSwitch.stop())
let namespace = "test/name/space"

let res = await node2.wakuRendezvous.batchAdvertise(
namespace, peers = @[remotePeerInfo1.peerId]
)
assert res.isOk(), $res.error

let response =
await node2.wakuRendezvous.batchRequest(namespace, 1, @[remotePeerInfo1.peerId])
assert response.isOk(), $response.error

let records = response.get()

check:
records[0].peerId == remotePeerInfo2.peerId
6 changes: 4 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,9 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"

node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr)
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr:
error "rendezvous new failed", error = error
return

# Always start discovering peers at startup
(await node.wakuRendezvous.initialRequestAll()).isOkOr:
Expand Down Expand Up @@ -1347,7 +1349,7 @@ proc stop*(node: WakuNode) {.async.} =
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.stop()
await node.wakuRendezvous.stopWait()

node.started = false

Expand Down
1 change: 1 addition & 0 deletions waku/waku_rendezvous/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ../waku_enr/capabilities
const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds
const PeersRequestedCount* = 12

proc computeNamespace*(clusterId: uint16, shard: uint16): string =
var namespace = "rs/"
Expand Down
43 changes: 28 additions & 15 deletions waku/waku_rendezvous/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import
./common

logScope:
topics = "waku rendez vous"
topics = "waku rendezvous"

declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous"

Expand All @@ -40,7 +40,8 @@ proc batchAdvertise*(
## Register with all rendezvous peers under a namespace

let catchable = catch:
await procCall RendezVous(self).advertise(namespace, ttl, peers)
#await procCall RendezVous(self).advertise(namespace, ttl, peers)
await self.advertise(namespace, ttl, peers)

if catchable.isErr():
return err(catchable.error.msg)
Expand All @@ -56,7 +57,8 @@ proc batchRequest*(
## Request all records from all rendezvous peers matching a namespace

let catchable = catch:
await RendezVous(self).request(namespace, count, peers)
#await procCall RendezVous(self).request(namespace, count, peers)
await self.request(namespace, count, peers)

if catchable.isErr():
return err(catchable.error.msg)
Expand All @@ -77,6 +79,9 @@ proc advertiseAll(
continue

# Advertise yourself on that peer
#[ procCall RendezVous(self).advertise(
namespace, DefaultRegistrationTTL, @[rpi.peerId]
) ]#
self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])

let catchable = catch:
Expand Down Expand Up @@ -106,8 +111,9 @@ proc initialRequestAll*(
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
continue

# Ask for 12 peer records for that shard
self.request(namespace, 12, @[rpi.peerId])
# Ask for peer records for that shard
#procCall RendezVous(self).request(namespace, PeersRequestedCount, @[rpi.peerId])
self.request(namespace, PeersRequestedCount, @[rpi.peerId])

let catchable = catch:
await allFinished(futs)
Expand Down Expand Up @@ -148,34 +154,41 @@ proc getRelayShards(enr: enr.Record): Option[RelayShards] =

proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): T {.raises: [].} =
): Result[T, string] {.raises: [].} =
let relayshard = getRelayShards(enr).valueOr:
warn "Using default cluster id 0"
RelayShards(clusterID: 0, shardIds: @[])

let capabilities = enr.getCapabilities()

let wrv = WakuRendezVous(
peerManager: peerManager, relayshard: relayshard, capabilities: capabilities
)
let catchable = catch:
procCall RendezVous.new(switch)

if catchable.isErr():
return err(catchable.error.msg)

let rv = catchable.get()

RendezVous(wrv).setup(switch)
var wrv = WakuRendezVous(rv)
wrv.peerManager = peerManager
wrv.relayshard = relayshard
wrv.capabilities = capabilities

debug "waku rendezvous initialized",
cluster = relayshard.clusterId,
shards = relayshard.shardIds,
capabilities = capabilities

return wrv
return ok(wrv)

proc start*(self: WakuRendezVous) {.async: (raises: []).} =
debug "starting waku rendezvous discovery"

# start registering forever
self.periodicRegistrationFut = self.periodicRegistration()

proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
debug "stopping waku rendezvous discovery"
debug "waku rendezvous discovery started"

proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
if not self.periodicRegistrationFut.isNil():
await self.periodicRegistrationFut.cancelAndWait()

debug "waku rendezvous discovery stopped"

0 comments on commit b679a54

Please sign in to comment.