Skip to content

Commit 275ce1d

Browse files
committed
Initial commit
0 parents  commit 275ce1d

File tree

5 files changed

+632
-0
lines changed

5 files changed

+632
-0
lines changed

README.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# This is not real yet
2+
3+
This is some unedited notes I made towards getting ZeroMQ working in
4+
Trio. The general idea is to expose a similar API to pyzmq, but with
5+
async in the appropriate places. I'm putting it up because a lot of
6+
people seem to be interested in getting this working so I'm hoping
7+
they take it and run with it :-).
8+
9+
The `_proxy.py` and `_trivial.py` files are fine I guess, though
10+
there's not much to them. (By "fine" I mean "looks fine at a glance,
11+
but I've never even tried importing any of this code".)
12+
13+
The interesting part is creating our `Socket` class. The `_socket.py`
14+
file has several half-written attempts jammed together, and lots of
15+
notes in comments. They get closer to workable as you go down in the
16+
file. (Don't trust everything you read there, it's a log of thoughts,
17+
not conclusions.)
18+
19+
Anyway, the major challenging thing about supporting zmq in trio is
20+
that zmq only grudgingly admits the idea that you might want to do
21+
anything in your process besides use zmq. But we have to convince it
22+
to play nicely with trio's event loop. (In fact this is so annoying
23+
that pyzmq's asyncio integration involves using a custom asyncio event
24+
loop based on zeromq. This is kind of terrible though and trio does
25+
not support it so we have to grit our teeth and use the annoying
26+
thing.)
27+
28+
So, here's how zmq's event loop integration works: You perform various
29+
operations on your zmq "socket" as normal, making sure to use the
30+
`NOBLOCK` mode. Sometimes, `send` or `recv` raises `zmq.Again`. Then
31+
you need to block waiting for the zmq "socket" to become
32+
readable/writable. At any given moment, you can check whether it's
33+
readable/writable by calling `getsockopt(zmq.EVENTS)`. So far so
34+
good. But how do we block waiting for readable/writable status to
35+
change?
36+
37+
Well, there is a file descriptor they expose, as the `.fd` value on
38+
the zmq socket. Maybe this is a shorthand for a call to `getsockopt`
39+
or something? It seems like everything in zmq is a `getsockopt`.
40+
Anyway, there's a file descriptor. The thing to realize about this
41+
file descriptor is that its semantics are extremely counterintuitive.
42+
It wasn't designed to be used this way; they just took an fd they use
43+
internally and exposed it like "good luck have fun".
44+
45+
As far as I can tell, the semantics are this: every time the value of
46+
`getsockopt(zmq.EVENTS)` becomes stale/out-of-date, then the fd
47+
becomes readable, as a signal that zmq needs to do something
48+
internally to update the `getsockopt(zmq.EVENTS)` value. So if you've
49+
just looked at `getsockopt(zmq.EVENTS)` and it wasn't what you wanted,
50+
you can block waiting for the fd to become readable. However, since
51+
zmq also uses this fd internally, *pretty much any zmq operation might
52+
update the internal cache of `getsockopt(zmq.EVENTS)` and mark the fd
53+
non-readable again*. In particular, `send` and `recv` do this, as does
54+
calling `getsockopt(zmq.EVENTS)`. But so can random other operations –
55+
one that's mentioned in `zmq/green/core.py` is that
56+
`setsockopt(SUBSCRIBE, ...)` and `setsockopt(UNSUBSCRIBE, ...)` can do
57+
it, and there are some notes at the bottom of `_socket.py` of my
58+
trying to trace through the zmq source code to figure out what other
59+
operations might do it, but basically this is a fool's errand and the
60+
only safe thing to do is assume that *any* call into zmq might trigger
61+
a reset of the fd's state.
62+
63+
So this means that if one task tries to call `send`, and gets
64+
`zmq.Again`, and then blocks waiting for the fd to become readable...
65+
and then another task just randomly like, checks some socket option or
66+
something, then this might cause zeromq to notice that actually the
67+
socket is writable now *but our blocked task won't wake up* because
68+
the notification got consumed by the other task.
69+
70+
Also, if you want to use a socket in duplex mode, with one task
71+
calling `send` while the other calls `recv`, then this is complicated
72+
because there's just one fd that's used for both, so you have to pick
73+
one of them to do the waiting, and then maybe wake up the other.
74+
75+
At the bottom of `_socket.py` is some very complicated code that tries
76+
to handle a bunch of these cases. It may be over-complicated. In
77+
particular it shouldn't support having multiple tasks blocked in
78+
`send` at the same time, or ditto for `recv`. That would be a small
79+
win. A much bigger win would be if we made a rule that only one task
80+
is allowed to use a given zmq socket at a time. (So in particular,
81+
disallow concurrent calls to `send` and `recv` on the same socket.)
82+
And this actually handles a lot of zmq's socket modes; a version of
83+
this library with this restriction would still be useful for a *lot*
84+
of zmq's use cases. But maybe not all, like I think there are some
85+
cases where you might want to go full-duplex on ROUTER or DEALER
86+
sockets? Anyway, we might want to start there.
87+
88+
As a general note, I started out thinking we'd want our `Socket` class
89+
to inherit from pyzmq's, but now I think that was a bad idea and we
90+
should just re-export the parts we want. (And possibly skip
91+
re-exporting some parts, like the many redundant ways to call
92+
`getsockopt()`.) You should expect to spend some time reading through
93+
the definition of `Socket` in the pyzmq source to see what all it
94+
exposes.
95+
96+
So yeah, that's what I know. This is not a working library. It needs a
97+
single implementation of `Socket`, with more of the API exposed, it
98+
needs tests, and docs, and all that good stuff. But it's a start!
99+
100+
101+
# License
102+
103+
MIT/Apache 2 dual, Same as Trio, we should add proper license files.
104+
And proper everything else files. By running cookiecutter-trio I
105+
guess.

trzmq/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
__all__ = []
2+
3+
from ._trivial import *
4+
__all__ += _trivial.__all__
5+
6+
from ._proxy import *
7+
__all__ += _proxy.__all__

trzmq/_proxy.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# The real zmq.proxy calls a C function zmq_proxy, which does an infinite
2+
# synchronous loop. Not very friendly -- even if we run it in a thread, we
3+
# have no way to stop it, except to close the whole context.
4+
#
5+
# https://github.com/zeromq/libzmq/blob/master/doc/zmq_proxy.txt
6+
# https://github.com/zeromq/libzmq/blob/master/src/proxy.cpp
7+
#
8+
# Differences:
9+
#
10+
# - I haven't bothered to implement the control socket functionality exposed
11+
# by zmq_proxy_steerable. It would be pretty easy to do, but you don't need
12+
# it to quit out of this. (I guess you might need it if you want to quit
13+
# without dropping any messages.)
14+
#
15+
# - This implementation reads a message from the source, then waits until the
16+
# capture and sink are writable before sending it to them. The C++
17+
# implementation waits until the sink (at least?) is writable before even
18+
# reading from the source. So we have *slightly* higher buffering, by like
19+
# 0.5 messages on average or something like that.
20+
21+
import trio
22+
23+
__all__ = ["proxy"]
24+
25+
async def _proxy_one_way(source, sink, capture):
26+
while True:
27+
message = await source.recv_multipart()
28+
if capture is not None:
29+
await capture.send_multipart(capture)
30+
await sink.send_multipart(message)
31+
32+
async def proxy(frontend, backend, capture=None):
33+
async with trio.open_nursery() as nursery:
34+
nursery.start_soon(_proxy_one_way, frontend, backend, capture)
35+
nursery.start_soon(_proxy_one_way, backend, frontend, capture)

0 commit comments

Comments
 (0)