From f856298caa7940f0876e8e5a8d2e897176bf7288 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 29 Nov 2024 15:31:08 +0100 Subject: [PATCH] chore: Filter in libwaku (#3177) --- library/libwaku.h | 16 +++ library/libwaku.nim | 60 +++++++++- .../requests/protocols/filter_request.nim | 105 ++++++++++++++++++ .../waku_thread_request.nim | 4 + waku/factory/node_factory.nim | 4 +- waku/waku_lightpush/client.nim | 15 ++- 6 files changed, 197 insertions(+), 7 deletions(-) create mode 100644 library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim diff --git a/library/libwaku.h b/library/libwaku.h index 6a5800d80f..0255e6115a 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -92,6 +92,22 @@ int waku_relay_unsubscribe(void* ctx, WakuCallBack callback, void* userData); +int waku_filter_subscribe(void* ctx, + const char* pubSubTopic, + const char* contentTopics, + WakuCallBack callback, + void* userData); + +int waku_filter_unsubscribe(void* ctx, + const char* pubSubTopic, + const char* contentTopics, + WakuCallBack callback, + void* userData); + +int waku_filter_unsubscribe_all(void* ctx, + WakuCallBack callback, + void* userData); + int waku_relay_get_num_connected_peers(void* ctx, const char* pubSubTopic, WakuCallBack callback, diff --git a/library/libwaku.nim b/library/libwaku.nim index 36286a3869..0dac652d0c 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -12,6 +12,7 @@ import waku/waku_core/message/message, waku/node/waku_node, waku/waku_core/topics/pubsub_topic, + waku/waku_core/subscription/push_handler, waku/waku_relay/protocol, ./events/json_message_event, ./waku_thread/waku_thread, @@ -20,6 +21,7 @@ import ./waku_thread/inter_thread_communication/requests/protocols/relay_request, ./waku_thread/inter_thread_communication/requests/protocols/store_request, ./waku_thread/inter_thread_communication/requests/protocols/lightpush_request, + ./waku_thread/inter_thread_communication/requests/protocols/filter_request, ./waku_thread/inter_thread_communication/requests/debug_node_request, ./waku_thread/inter_thread_communication/requests/discovery_request, ./waku_thread/inter_thread_communication/requests/ping_request, @@ -72,7 +74,7 @@ proc handleRes[T: string | void]( callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_OK -proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler = +proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = return proc( pubsubTopic: PubsubTopic, msg: WakuMessage ): Future[system.void] {.async.} = @@ -300,7 +302,7 @@ proc waku_relay_publish( RelayRequest.createShared( RelayMsgType.PUBLISH, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback(ctx)), + WakuRelayHandler(onReceivedMessage(ctx)), wakuMessage, ), ) @@ -343,7 +345,7 @@ proc waku_relay_subscribe( let pst = pubSubTopic.alloc() defer: deallocShared(pst) - var cb = relayEventCallback(ctx) + var cb = onReceivedMessage(ctx) waku_thread .sendRequestToWakuThread( @@ -374,7 +376,7 @@ proc waku_relay_unsubscribe( RelayRequest.createShared( RelayMsgType.SUBSCRIBE, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback(ctx)), + WakuRelayHandler(onReceivedMessage(ctx)), ), ) .handleRes(callback, userData) @@ -419,6 +421,56 @@ proc waku_relay_get_num_peers_in_mesh( ) .handleRes(callback, userData) +proc waku_filter_subscribe( + ctx: ptr WakuContext, + pubSubTopic: cstring, + contentTopics: cstring, + callback: WakuCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + checkLibwakuParams(ctx, callback, userData) + + waku_thread + .sendRequestToWakuThread( + ctx, + RequestType.FILTER, + FilterRequest.createShared( + FilterMsgType.SUBSCRIBE, + pubSubTopic, + contentTopics, + FilterPushHandler(onReceivedMessage(ctx)), + ), + ) + .handleRes(callback, userData) + +proc waku_filter_unsubscribe( + ctx: ptr WakuContext, + pubSubTopic: cstring, + contentTopics: cstring, + callback: WakuCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + checkLibwakuParams(ctx, callback, userData) + + waku_thread + .sendRequestToWakuThread( + ctx, + RequestType.FILTER, + FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE, pubSubTopic, contentTopics), + ) + .handleRes(callback, userData) + +proc waku_filter_unsubscribe_all( + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + checkLibwakuParams(ctx, callback, userData) + + waku_thread + .sendRequestToWakuThread( + ctx, RequestType.FILTER, FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL) + ) + .handleRes(callback, userData) + proc waku_lightpush_publish( ctx: ptr WakuContext, pubSubTopic: cstring, diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim new file mode 100644 index 0000000000..452a0c7c32 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim @@ -0,0 +1,105 @@ +import options, std/[strutils, sequtils] +import chronicles, chronos, results +import + ../../../../../waku/waku_filter_v2/client, + ../../../../../waku/waku_core/message/message, + ../../../../../waku/factory/waku, + ../../../../../waku/waku_filter_v2/common, + ../../../../../waku/waku_core/subscription/push_handler, + ../../../../../waku/node/peer_manager/peer_manager, + ../../../../../waku/node/waku_node, + ../../../../../waku/waku_core/topics/pubsub_topic, + ../../../../../waku/waku_core/topics/content_topic, + ../../../../alloc + +type FilterMsgType* = enum + SUBSCRIBE + UNSUBSCRIBE + UNSUBSCRIBE_ALL + +type FilterRequest* = object + operation: FilterMsgType + pubsubTopic: cstring + contentTopics: cstring ## comma-separated list of content-topics + filterPushEventCallback: FilterPushHandler ## handles incoming filter pushed msgs + +proc createShared*( + T: type FilterRequest, + op: FilterMsgType, + pubsubTopic: cstring = "", + contentTopics: cstring = "", + filterPushEventCallback: FilterPushHandler = nil, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].pubsubTopic = pubsubTopic.alloc() + ret[].contentTopics = contentTopics.alloc() + ret[].filterPushEventCallback = filterPushEventCallback + + return ret + +proc destroyShared(self: ptr FilterRequest) = + deallocShared(self[].pubsubTopic) + deallocShared(self[].contentTopics) + deallocShared(self) + +proc process*( + self: ptr FilterRequest, waku: ptr Waku +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + const FilterOpTimeout = 5.seconds + if waku.node.wakuFilterClient.isNil(): + let errorMsg = "FilterRequest waku.node.wakuFilterClient is nil" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + + case self.operation + of SUBSCRIBE: + waku.node.wakuFilterClient.registerPushHandler(self.filterPushEventCallback) + + let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let errorMsg = + "could not find peer with WakuFilterSubscribeCodec when subscribing" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + + let pubsubTopic = some(PubsubTopic($self[].pubsubTopic)) + let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it)) + + let subFut = waku.node.filterSubscribe(pubsubTopic, contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + let errorMsg = "filter subscription timed out" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + of UNSUBSCRIBE: + let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let errorMsg = + "could not find peer with WakuFilterSubscribeCodec when unsubscribing" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + + let pubsubTopic = some(PubsubTopic($self[].pubsubTopic)) + let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it)) + + let subFut = waku.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + let errorMsg = "filter un-subscription timed out" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + of UNSUBSCRIBE_ALL: + let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: + let errorMsg = + "could not find peer with WakuFilterSubscribeCodec when unsubscribing all" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + + let unsubFut = waku.node.filterUnsubscribeAll(peer) + + if not await unsubFut.withTimeout(FilterOpTimeout): + let errorMsg = "filter un-subscription all timed out" + error "fail filter process", error = errorMsg, op = $(self.operation) + return err(errorMsg) + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 63dacc06d8..1036e2b2ad 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -11,6 +11,7 @@ import ./requests/protocols/relay_request, ./requests/protocols/store_request, ./requests/protocols/lightpush_request, + ./requests/protocols/filter_request, ./requests/debug_node_request, ./requests/discovery_request, ./requests/ping_request @@ -24,6 +25,7 @@ type RequestType* {.pure.} = enum DEBUG DISCOVERY LIGHTPUSH + FILTER type InterThreadRequest* = object reqType: RequestType @@ -64,6 +66,8 @@ proc process*( cast[ptr DiscoveryRequest](request[].reqContent).process(waku) of LIGHTPUSH: cast[ptr LightpushRequest](request[].reqContent).process(waku) + of FILTER: + cast[ptr FilterRequest](request[].reqContent).process(waku) return await retFut diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2ed2ad2e7c..d98a99546d 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -320,10 +320,10 @@ proc setupProtocols( except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) + mountLightPushClient(node) if conf.lightpushnode != "": let lightPushNode = parsePeerInfo(conf.lightpushnode) if lightPushNode.isOk(): - mountLightPushClient(node) node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) else: return err("failed to set node waku lightpush peer: " & lightPushNode.error) @@ -341,11 +341,11 @@ proc setupProtocols( except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) + await node.mountFilterClient() if conf.filternode != "": let filterNode = parsePeerInfo(conf.filternode) if filterNode.isOk(): try: - await node.mountFilterClient() node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) except CatchableError: return err( diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 4f516dec5d..9f5819ecc7 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -1,7 +1,9 @@ {.push raises: [].} -import std/options, results, chronicles, chronos, metrics, bearssl/rand +import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils +import libp2p/peerid import + ../waku_core/peers, ../node/peer_manager, ../node/delivery_monitor/publish_observer, ../utils/requests, @@ -71,6 +73,15 @@ proc publish*( message: WakuMessage, peer: PeerId | RemotePeerInfo, ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + when peer is PeerId: + info "publish", + peerId = shortLog(peer), + msg_hash = computeMessageHash(pubsubTopic, message).to0xHex + else: + info "publish", + peerId = shortLog(peer.peerId), + msg_hash = computeMessageHash(pubsubTopic, message).to0xHex + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) ?await wl.sendPushRequest(pushRequest, peer) @@ -85,6 +96,8 @@ proc publishToAny*( ## This proc is similar to the publish one but in this case ## we don't specify a particular peer and instead we get it from peer manager + info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex + let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: return err("could not retrieve a peer supporting WakuLightPushCodec")