Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(async): {.async: (raises).} for connmanager #1172

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions libp2p/connmanager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type
muxed: Table[PeerId, seq[Muxer]]
connEvents: array[ConnEventKind, OrderedSet[ConnEventHandler]]
peerEvents: array[PeerEventKind, OrderedSet[PeerEventHandler]]
expectedConnectionsOverLimit*: Table[(PeerId, Direction), Future[Muxer]]
expectedConnectionsOverLimit*:
Table[(PeerId, Direction), Future[Muxer].Raising([CancelledError])]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the difference between this and using a Future[Result[Muxer, ...]].

Actually, I guess the broader question would be: Is there a reason we're going for exceptions over handling errors with Result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert, but Nim natively supports exceptions and there's also compiler support for pros signatures https://nim-lang.org/docs/tut2.html#exceptions-annotating-procs-with-raised-exceptions. Resultwas created in some status library. It might be painful to use Resultif Nim std wasn't designed for that. I can't say much as in general, this project uses more exceptions than Result.

peerStore*: PeerStore

ConnectionSlot* = object
Expand Down Expand Up @@ -123,7 +124,9 @@ proc removeConnEventHandler*(
) =
c.connEvents[kind].excl(handler)

proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async.} =
proc triggerConnEvent*(
c: ConnManager, peerId: PeerId, event: ConnEvent
) {.async: (raises: [CancelledError]).} =
try:
trace "About to trigger connection events", peer = peerId
if c.connEvents[event.kind].len() > 0:
Expand Down Expand Up @@ -153,7 +156,9 @@ proc removePeerEventHandler*(
) =
c.peerEvents[kind].excl(handler)

proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.async.} =
proc triggerPeerEvents*(
c: ConnManager, peerId: PeerId, event: PeerEvent
) {.async: (raises: [CancelledError]).} =
trace "About to trigger peer events", peer = peerId
if c.peerEvents[event.kind].len == 0:
return
Expand All @@ -173,7 +178,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn

proc expectConnection*(
c: ConnManager, p: PeerId, dir: Direction
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises: [AlreadyExpectingConnectionError, CancelledError]).} =
## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer`
let key = (p, dir)
if key in c.expectedConnectionsOverLimit:
Expand All @@ -182,7 +187,7 @@ proc expectConnection*(
"Already expecting an incoming connection from that peer",
)

let future = newFuture[Muxer]()
let future = Future[Muxer].Raising([CancelledError]).init()
c.expectedConnectionsOverLimit[key] = future

try:
Expand All @@ -204,7 +209,7 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
let conn = muxer.connection
return muxer in c.muxed.getOrDefault(conn.peerId)

proc closeMuxer(muxer: Muxer) {.async.} =
proc closeMuxer(muxer: Muxer) {.async: (raises: [CancelledError]).} =
trace "Cleaning up muxer", m = muxer

await muxer.close()
Expand All @@ -215,7 +220,7 @@ proc closeMuxer(muxer: Muxer) {.async.} =
trace "Exception in close muxer handler", exc = exc.msg
trace "Cleaned up muxer", m = muxer

proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
proc muxCleanup(c: ConnManager, mux: Muxer) {.async: (raises: [CancelledError]).} =
try:
trace "Triggering disconnect events", mux
let peerId = mux.connection.peerId
Expand All @@ -237,7 +242,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
# do not need to propagate CancelledError and should handle other errors
warn "Unexpected exception peer cleanup handler", mux, msg = exc.msg

proc onClose(c: ConnManager, mux: Muxer) {.async.} =
proc onClose(c: ConnManager, mux: Muxer) {.async: (raises: [CancelledError]).} =
## connection close even handler
##
## triggers the connections resource cleanup
Expand Down Expand Up @@ -321,7 +326,9 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =

trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len

proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async.} =
proc getIncomingSlot*(
c: ConnManager
): Future[ConnectionSlot] {.async: (raises: [CancelledError]).} =
await c.inSema.acquire()
return ConnectionSlot(connManager: c, direction: In)

Expand Down Expand Up @@ -354,7 +361,7 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
cs.release()
return

proc semaphoreMonitor() {.async.} =
proc semaphoreMonitor() {.async: (raises: [CancelledError]).} =
try:
await conn.join()
except CatchableError as exc:
Expand All @@ -370,28 +377,32 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) =
return
cs.trackConnection(mux.connection)

proc getStream*(c: ConnManager, muxer: Muxer): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager, muxer: Muxer
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
## get a muxed stream for the passed muxer
##

if not (isNil(muxer)):
return await muxer.newStream()

proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager, peerId: PeerId
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
## get a muxed stream for the passed peer from any connection
##

return await c.getStream(c.selectMuxer(peerId))

proc getStream*(
c: ConnManager, peerId: PeerId, dir: Direction
): Future[Connection] {.async.} =
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
## get a muxed stream for the passed peer from a connection with `dir`
##

return await c.getStream(c.selectMuxer(peerId, dir))

proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async: (raises: [CancelledError]).} =
## drop connections and cleanup resources for peer
##
trace "Dropping peer", peerId
Expand All @@ -402,7 +413,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =

trace "Peer dropped", peerId

proc close*(c: ConnManager) {.async.} =
proc close*(c: ConnManager) {.async: (raises: [CancelledError]).} =
## cleanup resources for the connection
## manager
##
Expand Down
6 changes: 3 additions & 3 deletions libp2p/utils/semaphore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ logScope:
type AsyncSemaphore* = ref object of RootObj
size*: int
count: int
queue: seq[Future[void]]
queue: seq[Future[void].Raising([CancelledError])]

proc newAsyncSemaphore*(size: int): AsyncSemaphore =
AsyncSemaphore(size: size, count: size)
Expand All @@ -38,14 +38,14 @@ proc tryAcquire*(s: AsyncSemaphore): bool =
trace "Acquired slot", available = s.count, queue = s.queue.len
return true

proc acquire*(s: AsyncSemaphore): Future[void] =
proc acquire*(s: AsyncSemaphore): Future[void].Raising([CancelledError]) =
## Acquire a resource and decrement the resource
## counter. If no more resources are available,
## the returned future will not complete until
## the resource count goes above 0.
##

let fut = newFuture[void]("AsyncSemaphore.acquire")
let fut = Future[void].Raising([CancelledError]).init("AsyncSemaphore.acquire")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can create a new template in utils/future.nim:

 template newFuture*[T](fromProc: static[string] = ""): auto =
  Future[T].Raising([CancelledError]).init(fromProc)

if s.tryAcquire():
fut.complete()
return fut
Expand Down
2 changes: 1 addition & 1 deletion tests/testsemaphore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import chronos

import ../libp2p/utils/semaphore

import ./helpers
import ./asyncunit

randomize()

Expand Down
Loading