Skip to content

Commit

Permalink
chore: Filter in libwaku (#3177)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Nov 29, 2024
1 parent 2ab9c3d commit f856298
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 7 deletions.
16 changes: 16 additions & 0 deletions library/libwaku.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ int waku_relay_unsubscribe(void* ctx,
WakuCallBack callback,
void* userData);

int waku_filter_subscribe(void* ctx,
const char* pubSubTopic,
const char* contentTopics,
WakuCallBack callback,
void* userData);

int waku_filter_unsubscribe(void* ctx,
const char* pubSubTopic,
const char* contentTopics,
WakuCallBack callback,
void* userData);

int waku_filter_unsubscribe_all(void* ctx,
WakuCallBack callback,
void* userData);

int waku_relay_get_num_connected_peers(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
Expand Down
60 changes: 56 additions & 4 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
waku/waku_core/message/message,
waku/node/waku_node,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread,
Expand All @@ -20,6 +21,7 @@ import
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/protocols/store_request,
./waku_thread/inter_thread_communication/requests/protocols/lightpush_request,
./waku_thread/inter_thread_communication/requests/protocols/filter_request,
./waku_thread/inter_thread_communication/requests/debug_node_request,
./waku_thread/inter_thread_communication/requests/discovery_request,
./waku_thread/inter_thread_communication/requests/ping_request,
Expand Down Expand Up @@ -72,7 +74,7 @@ proc handleRes[T: string | void](
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler =
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
return proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
): Future[system.void] {.async.} =
Expand Down Expand Up @@ -300,7 +302,7 @@ proc waku_relay_publish(
RelayRequest.createShared(
RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback(ctx)),
WakuRelayHandler(onReceivedMessage(ctx)),
wakuMessage,
),
)
Expand Down Expand Up @@ -343,7 +345,7 @@ proc waku_relay_subscribe(
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
var cb = relayEventCallback(ctx)
var cb = onReceivedMessage(ctx)

waku_thread
.sendRequestToWakuThread(
Expand Down Expand Up @@ -374,7 +376,7 @@ proc waku_relay_unsubscribe(
RelayRequest.createShared(
RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback(ctx)),
WakuRelayHandler(onReceivedMessage(ctx)),
),
)
.handleRes(callback, userData)
Expand Down Expand Up @@ -419,6 +421,56 @@ proc waku_relay_get_num_peers_in_mesh(
)
.handleRes(callback, userData)

proc waku_filter_subscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
contentTopics: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

waku_thread
.sendRequestToWakuThread(
ctx,
RequestType.FILTER,
FilterRequest.createShared(
FilterMsgType.SUBSCRIBE,
pubSubTopic,
contentTopics,
FilterPushHandler(onReceivedMessage(ctx)),
),
)
.handleRes(callback, userData)

proc waku_filter_unsubscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
contentTopics: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

waku_thread
.sendRequestToWakuThread(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE, pubSubTopic, contentTopics),
)
.handleRes(callback, userData)

proc waku_filter_unsubscribe_all(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)

waku_thread
.sendRequestToWakuThread(
ctx, RequestType.FILTER, FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL)
)
.handleRes(callback, userData)

proc waku_lightpush_publish(
ctx: ptr WakuContext,
pubSubTopic: cstring,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import options, std/[strutils, sequtils]
import chronicles, chronos, results
import
../../../../../waku/waku_filter_v2/client,
../../../../../waku/waku_core/message/message,
../../../../../waku/factory/waku,
../../../../../waku/waku_filter_v2/common,
../../../../../waku/waku_core/subscription/push_handler,
../../../../../waku/node/peer_manager/peer_manager,
../../../../../waku/node/waku_node,
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_core/topics/content_topic,
../../../../alloc

type FilterMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
UNSUBSCRIBE_ALL

type FilterRequest* = object
operation: FilterMsgType
pubsubTopic: cstring
contentTopics: cstring ## comma-separated list of content-topics
filterPushEventCallback: FilterPushHandler ## handles incoming filter pushed msgs

proc createShared*(
T: type FilterRequest,
op: FilterMsgType,
pubsubTopic: cstring = "",
contentTopics: cstring = "",
filterPushEventCallback: FilterPushHandler = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].contentTopics = contentTopics.alloc()
ret[].filterPushEventCallback = filterPushEventCallback

return ret

proc destroyShared(self: ptr FilterRequest) =
deallocShared(self[].pubsubTopic)
deallocShared(self[].contentTopics)
deallocShared(self)

proc process*(
self: ptr FilterRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

const FilterOpTimeout = 5.seconds
if waku.node.wakuFilterClient.isNil():
let errorMsg = "FilterRequest waku.node.wakuFilterClient is nil"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)

case self.operation
of SUBSCRIBE:
waku.node.wakuFilterClient.registerPushHandler(self.filterPushEventCallback)

let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when subscribing"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)

let pubsubTopic = some(PubsubTopic($self[].pubsubTopic))
let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it))

let subFut = waku.node.filterSubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter subscription timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
of UNSUBSCRIBE:
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)

let pubsubTopic = some(PubsubTopic($self[].pubsubTopic))
let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it))

let subFut = waku.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
of UNSUBSCRIBE_ALL:
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)

let unsubFut = waku.node.filterUnsubscribeAll(peer)

if not await unsubFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription all timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)

return ok("")
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
./requests/protocols/relay_request,
./requests/protocols/store_request,
./requests/protocols/lightpush_request,
./requests/protocols/filter_request,
./requests/debug_node_request,
./requests/discovery_request,
./requests/ping_request
Expand All @@ -24,6 +25,7 @@ type RequestType* {.pure.} = enum
DEBUG
DISCOVERY
LIGHTPUSH
FILTER

type InterThreadRequest* = object
reqType: RequestType
Expand Down Expand Up @@ -64,6 +66,8 @@ proc process*(
cast[ptr DiscoveryRequest](request[].reqContent).process(waku)
of LIGHTPUSH:
cast[ptr LightpushRequest](request[].reqContent).process(waku)
of FILTER:
cast[ptr FilterRequest](request[].reqContent).process(waku)

return await retFut

Expand Down
4 changes: 2 additions & 2 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())

mountLightPushClient(node)
if conf.lightpushnode != "":
let lightPushNode = parsePeerInfo(conf.lightpushnode)
if lightPushNode.isOk():
mountLightPushClient(node)
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
else:
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
Expand All @@ -341,11 +341,11 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())

await node.mountFilterClient()
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
try:
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
except CatchableError:
return err(
Expand Down
15 changes: 14 additions & 1 deletion waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{.push raises: [].}

import std/options, results, chronicles, chronos, metrics, bearssl/rand
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import
../waku_core/peers,
../node/peer_manager,
../node/delivery_monitor/publish_observer,
../utils/requests,
Expand Down Expand Up @@ -71,6 +73,15 @@ proc publish*(
message: WakuMessage,
peer: PeerId | RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
when peer is PeerId:
info "publish",
peerId = shortLog(peer),
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
else:
info "publish",
peerId = shortLog(peer.peerId),
msg_hash = computeMessageHash(pubsubTopic, message).to0xHex

let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
?await wl.sendPushRequest(pushRequest, peer)

Expand All @@ -85,6 +96,8 @@ proc publishToAny*(
## This proc is similar to the publish one but in this case
## we don't specify a particular peer and instead we get it from peer manager

info "publishToAny", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex

let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr:
return err("could not retrieve a peer supporting WakuLightPushCodec")

Expand Down

0 comments on commit f856298

Please sign in to comment.