From 242f516b5bd616873d70221fddbbcdb29f569153 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 26 Feb 2024 19:56:00 +0100 Subject: [PATCH 1/2] does not finish write futures while the channel is being closed --- libp2p/muxers/mplex/lpchannel.nim | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index f81c4afa5e..35d1d63c12 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -61,6 +61,7 @@ type closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code writes*: int # In-flight writes + closing: Future[void] func shortLog*(s: LPChannel): auto = try: @@ -99,7 +100,7 @@ proc reset*(s: LPChannel) {.async.} = if s.isClosed: trace "Already closed", s return - + s.closing = newFuture[void]() s.isClosed = true s.closedLocal = true s.localReset = not s.remoteReset @@ -117,10 +118,11 @@ proc reset*(s: LPChannel) {.async.} = await s.conn.close() trace "Can't send reset message", s, conn = s.conn, msg = exc.msg - asyncSpawn resetMessage() + await resetMessage() await s.closeImpl() # noraises, nocancels - + if not s.closing.finished: + s.closing.complete() trace "Channel reset", s method close*(s: LPChannel) {.async.} = @@ -130,6 +132,8 @@ method close*(s: LPChannel) {.async.} = if s.closedLocal: trace "Already closed", s return + + s.closing = newFuture[void]() s.closedLocal = true trace "Closing channel", s, conn = s.conn, len = s.len @@ -147,7 +151,8 @@ method close*(s: LPChannel) {.async.} = trace "Cannot send close message", s, id = s.id, msg = exc.msg await s.closeUnderlying() # maybe already eofed - + if not s.closing.finished: + s.closing.complete() trace "Closed channel", s, len = s.len method initStream*(s: LPChannel) = @@ -200,6 +205,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = if s.remoteReset: raise newLPStreamResetError() if s.closedLocal: + await s.closing raise newLPStreamClosedError() if s.conn.closed: raise newLPStreamConnDownError() @@ -293,8 +299,9 @@ proc init*( msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, - dir: if initiator: Direction.Out else: Direction.In) - + dir: if initiator: Direction.Out else: Direction.In, + closing: newFuture[void]()) + chann.closing.complete() chann.initStream() when chronicles.enabledLogLevel == LogLevel.TRACE: From 9c1d879a52ed12c1f56f12b95bb74f94c9adf976 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 29 Feb 2024 17:10:21 +0100 Subject: [PATCH 2/2] revert asyncSpawn change --- libp2p/muxers/mplex/lpchannel.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 35d1d63c12..7261ff22a3 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -118,7 +118,7 @@ proc reset*(s: LPChannel) {.async.} = await s.conn.close() trace "Can't send reset message", s, conn = s.conn, msg = exc.msg - await resetMessage() + asyncSpawn resetMessage() await s.closeImpl() # noraises, nocancels if not s.closing.finished: