Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use zmq-anyio #1291

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

Use zmq-anyio #1291

wants to merge 9 commits into from

Conversation

davidbrochart
Copy link
Collaborator

@davidbrochart davidbrochart commented Nov 7, 2024

@minrk this is using an AnyIO-compatible pyzmq API (from https://github.com/davidbrochart/zmq-anyio), as discussed in zeromq/pyzmq#2045.

@davidbrochart davidbrochart marked this pull request as draft November 7, 2024 13:57
@davidbrochart
Copy link
Collaborator Author

I can see with this PR that ipykernel runs on trio when hard-coding the backend here, but how can we choose e.g. from JupyterLab to set trio_loop?

@minrk
Copy link
Member

minrk commented Nov 8, 2024

Nice! So it looks like for compatibility with Windows, you've gone with spawning a select thread for every waiting socket?

I can see with this PR that ipykernel runs on trio when hard-coding the backend here, but how can we choose e.g. from JupyterLab to set trio_loop?

This should presumably be the same as any other Kernel configuration option, so a config=True traitlet, set e.g. from a CLI arg or env var in the kernelspec (or parameterized kernel, or (most compatible) in the kernel config file (~/.ipython/profile_default/ipython_kernel_config.py).

@davidbrochart
Copy link
Collaborator Author

So it looks like for compatibility with Windows, you've gone with spawning a select thread for every waiting socket?

Yes, and I think Tornado does something similar here. This means a zmq_anyio.Socket has to be started asynchronously with an async context manager (or by calling its start() method), but that's consistent with an AnyIO socket.
AnyIO's wait_socket_readable "does NOT work on Windows when using the asyncio backend with a proactor event loop (default on py3.8+)".

Thanks for the kernel configuration, I'll try that 👍

@minrk
Copy link
Member

minrk commented Nov 8, 2024

I think Tornado does something similar

It does. The big difference is Tornado starts one selector thread per event loop which is far more scalable, whereas zmq-anyio starts one per socket. This makes sense from a library simplicity standpoint since there isn't a thread running when you are done with a socket, but definitely isn't scalable and probably isn't what we should do long term (and why I think anyio should have this built-in, just like tornado).

As I understand it, this means anyio will spawn up to 40 threads by default. That's might be okay for ipykernel, but I'd say it does mean we shouldn't use zmq-anyio in any client places like jupyter-client or jupyter-server. Because as soon as you've got 40 idle zmq sockets waiting for a message (what they spend most of their time doing), any subsequent calls to to_thread will block forever. I do think the first priority here should be universal wait_socket_readable support in anyio itself.

You might be able to provoke this in ipykernel by spawning 41 subshells and not using them, since I think each one adds a socket that will be idle.

You could limit the starvation by making the to_thread function one with a timeout that you re-enter all the time from the main async thread. Idle sockets will still starve the thread pool, just with timeout-sized bubbles instead of forever.

@davidbrochart
Copy link
Collaborator Author

Interesting, I hadn't thought about that.
Maybe before moving that part to AnyIO we could have async zmq_anyio.start() and zmq_anyio.stop() that would start/stop the thread?

@minrk
Copy link
Member

minrk commented Nov 8, 2024

Maybe before moving that part to AnyIO we could have async zmq_anyio.start() and zmq_anyio.stop() that would start/stop the thread?

Sure! I think that's sensible. I don't have enough experience with the task group hierarchy stuff to know what that should look like. I think it's probably appropriate to have some tests in zmq-anyio with a lot of idle sockets (at least more than the thread count, which I think can be set to 1 or 2) to probe this stuff.

If I were the one writing it, I'd implement a zmq_anyio.wait_readable that:

  1. invokes tornado's SelectorThread logic as needed on asyncio as done in pyzmq, and
  2. calls trio.lowlevel.wait_readable on trio
  3. (ideally) accepts integer FD so it doesn't need to call socket.fromfd (like trio.lowlevel.wait_readable and asyncio.add_reader, unlike anyio.wait_socket_readable) which I think doubles the number of open FDs

You should be able to base it on anyio.wait_socket_readable which assumes socket objects, despite there being no actual need for that restriction (I'm guessing it's inherited because trio used to have this same issue, but doesn't anymore).

A smaller, but maybe less clean and less efficient version with a one-time monkeypatch:

if windows and asyncio and proactor:
    # only needed once per asyncio event loop, this is the only situation where a patch is needed
    loop = asyncio.get_running_loop()
    loop.add_reader = selector_add_reader # from tornado's AddThreadSelector
    loop.remove_reader = selector_remove_reader # from tornado's AddThreadSelector

...
# assume wait_socket_readable works, which it should now
await anyio.wait_socket_readable(socket.fromfd(zmq_sock.FD))
# hopefully anyio will fix integer FD support to match underlying asyncio and trio

If you did any of those, there would be the advantage that no actual thread is spawned except in the Windows + Proactor + asyncio case, which would get exactly one thread.

fwiw, I started to extract the tornado feature into its own package, but haven't tested it enough to publish a release if there's some reason to not depend on tornado for this feature (I don't think there is): https://github.com/minrk/async-selector-thread. Requiring tornado for this doesn't mean the tornado IOLoop object ever needs to be created, as the SelectorThread logic is pure asyncio, so there's really no reason not to require tornado for this as long as it's the only package with the required feature.

@davidbrochart
Copy link
Collaborator Author

Thanks @minrk, that was very helpful.
I both opened agronholm/anyio#820 and used the approach you described in davidbrochart/zmq-anyio#5.
BTW it seems that directly passing a file descriptor to anyio.wait_socket_readable works fine, no need to call socket.fromfd (with a type mismatch though).

@minrk
Copy link
Member

minrk commented Nov 13, 2024

I had another thought where you could shutdown the thread if nothing is waiting (when remove_reader is called). This might play nicer with anyio's design of shutting things down when they aren't in use, and you don't need anything hooked up to close unless it's called while waiting on a socket. But it comes at a performance cost because you are probably going to recreate the thread a whole bunch of times (once per message if you only have one socket). I don't actually think we should do that, but it's an idea if there are objections to leaving an idle thread running.

But really by far the most efficient approach is ZMQStream's event-driven on_recv, which registers the FD exactly once and calls handle_events whenever there might be a message, rather than calling add_reader and remove_reader for every message.

@davidbrochart
Copy link
Collaborator Author

Still a few tests failing, and trio is not enabled in tests (more failures), but this is taking shape.

@davidbrochart
Copy link
Collaborator Author

I'm not sure about the tests that time out, it never happens locally on my machine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants