diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index f81c4afa5e..7261ff22a3 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -61,6 +61,7 @@ type closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code writes*: int # In-flight writes + closing: Future[void] func shortLog*(s: LPChannel): auto = try: @@ -99,7 +100,7 @@ proc reset*(s: LPChannel) {.async.} = if s.isClosed: trace "Already closed", s return - + s.closing = newFuture[void]() s.isClosed = true s.closedLocal = true s.localReset = not s.remoteReset @@ -120,7 +121,8 @@ proc reset*(s: LPChannel) {.async.} = asyncSpawn resetMessage() await s.closeImpl() # noraises, nocancels - + if not s.closing.finished: + s.closing.complete() trace "Channel reset", s method close*(s: LPChannel) {.async.} = @@ -130,6 +132,8 @@ method close*(s: LPChannel) {.async.} = if s.closedLocal: trace "Already closed", s return + + s.closing = newFuture[void]() s.closedLocal = true trace "Closing channel", s, conn = s.conn, len = s.len @@ -147,7 +151,8 @@ method close*(s: LPChannel) {.async.} = trace "Cannot send close message", s, id = s.id, msg = exc.msg await s.closeUnderlying() # maybe already eofed - + if not s.closing.finished: + s.closing.complete() trace "Closed channel", s, len = s.len method initStream*(s: LPChannel) = @@ -200,6 +205,7 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = if s.remoteReset: raise newLPStreamResetError() if s.closedLocal: + await s.closing raise newLPStreamClosedError() if s.conn.closed: raise newLPStreamConnDownError() @@ -293,8 +299,9 @@ proc init*( msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, - dir: if initiator: Direction.Out else: Direction.In) - + dir: if initiator: Direction.Out else: Direction.In, + closing: newFuture[void]()) + chann.closing.complete() chann.initStream() when chronicles.enabledLogLevel == LogLevel.TRACE: