diff options
author | Ben Ford <ben@boothead.co.uk> | 2010-09-20 07:08:27 +0100 |
---|---|---|
committer | Ben Ford <ben@boothead.co.uk> | 2010-09-20 07:08:27 +0100 |
commit | b75e83a35c2bd3458f14917cb532348e958ca0b5 (patch) | |
tree | 7392bee8297d87e926cffc2033634fca97487ef0 /examples | |
parent | 8972567f0a4e81e96eff32a0bef9f4980cea25e1 (diff) | |
download | eventlet-b75e83a35c2bd3458f14917cb532348e958ca0b5.tar.gz |
First stab at zeromq support. This consists of:
A new hub: This closely mirrors the poll hub with some of the internal logic changed to reflect zmq's flags.
A green module for zmq: This subclasses Context and Socket to ensure calls are non blocking.
A (very sparse) beginings of a test module.
An example: A melding of the pyzmq chat example and the eventlet telnet chat example.
TODO
zmq_poll chokes if the sockets passed to it come from different contexts. As context is the entry point to everything else then it would make sense to include a check in here that each thread has only one context instance. By context being the entry point I mean:
ctx = zmq.Context()
socket = ctx.socket(zmq.<type-of-socket>)
This call to socket is repeated for each socket you want and ctx must be the same one for each thread.
Tests. I'd like to get to the point f having all zmq socket pairs tested - and perhaps a nice benchmark suite too.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/zmq_chat.py | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/examples/zmq_chat.py b/examples/zmq_chat.py new file mode 100644 index 0000000..cd07f80 --- /dev/null +++ b/examples/zmq_chat.py @@ -0,0 +1,64 @@ +import eventlet, sys +from eventlet.green import socket, zmq +from eventlet.hubs import use_hub +use_hub('zeromq') + +ADDR = 'ipc:///tmp/chat' + +ctx = zmq.Context() + +def publish(writer): + + print "connected" + socket = ctx.socket(zmq.SUB) + + socket.setsockopt(zmq.SUBSCRIBE, "") + socket.connect(ADDR) + eventlet.sleep(0.1) + + while True: + msg = socket.recv_pyobj() + str_msg = "%s: %s" % msg + writer.write(str_msg) + writer.flush() + + +PORT=3001 + +def read_chat_forever(reader, pub_socket): + + line = reader.readline() + who = 'someone' + while line: + print "Chat:", line.strip() + if line.startswith('name:'): + who = line.split(':')[-1].strip() + + try: + pub_socket.send_pyobj((who, line)) + except socket.error, e: + # ignore broken pipes, they just mean the participant + # closed its connection already + if e[0] != 32: + raise + line = reader.readline() + print "Participant left chat." + +try: + print "ChatServer starting up on port %s" % PORT + server = eventlet.listen(('0.0.0.0', PORT)) + pub_socket = ctx.socket(zmq.PUB) + pub_socket.bind(ADDR) + eventlet.spawn_n(publish, + sys.stdout) + while True: + new_connection, address = server.accept() + + print "Participant joined chat." + eventlet.spawn_n(publish, + new_connection.makefile('w')) + eventlet.spawn_n(read_chat_forever, + new_connection.makefile('r'), + pub_socket) +except (KeyboardInterrupt, SystemExit): + print "ChatServer exiting."
\ No newline at end of file |