Skip to content

Commit

Permalink
restore zero-copy recv on shell messages (#1280)
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk authored Oct 25, 2024
1 parent 41a965e commit 48e39cf
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,9 @@ async def process_control_message(self, msg=None):
assert self.control_thread is None or threading.current_thread() == self.control_thread

msg = msg or await self.control_socket.recv_multipart()
copy = not isinstance(msg[0], zmq.Message)
idents, msg = self.session.feed_identities(msg, copy=copy)
idents, msg = self.session.feed_identities(msg)
try:
msg = self.session.deserialize(msg, content=True, copy=copy)
msg = self.session.deserialize(msg, content=True)
except Exception:
self.log.error("Invalid Control Message", exc_info=True) # noqa: G201
return
Expand Down Expand Up @@ -375,15 +374,12 @@ async def shell_channel_thread_main(self):

try:
while True:
msg = await self.shell_socket.recv_multipart()

# Deserialize whole message just to get subshell_id.
msg = await self.shell_socket.recv_multipart(copy=False)
# deserialize only the header to get subshell_id
# Keep original message to send to subshell_id unmodified.
# Ideally only want to deserialize message once.
copy = not isinstance(msg[0], zmq.Message)
_, msg2 = self.session.feed_identities(msg, copy=copy)
_, msg2 = self.session.feed_identities(msg, copy=False)
try:
msg3 = self.session.deserialize(msg2, content=False, copy=copy)
msg3 = self.session.deserialize(msg2, content=False, copy=False)
subshell_id = msg3["header"].get("subshell_id")

# Find inproc pair socket to use to send message to correct subshell.
Expand Down Expand Up @@ -1210,9 +1206,7 @@ def do_clear(self):

def _topic(self, topic):
"""prefixed topic for IOPub messages"""
base = "kernel.%s" % self.ident

return (f"{base}.{topic}").encode()
return (f"kernel.{self.ident}.{topic}").encode()

_aborting = Bool(False)

Expand Down

0 comments on commit 48e39cf

Please sign in to comment.