PyZmq Tornado Event Loop¶
ØMQ Poller can be used to serve and communicate with multiple sockets. How ever, with ØMQ Poller, you end up with explicit blocks (under if loop) for handling the sockets. Each socket registered with ØMQ Poller has to have an explicit “if block” to handle it.
PyZmq includes the tornado ioloop and adapts its IOStream class into ZMQStream for handling poll events on ØMQ sockets. You can register callbacks to receive and send data.
Before you do this, you must have tornado module installed:
pip install tornado
We will be redoing the previous program to take advantage of the ZMQStream and Tornado ioloop.
pyzmq_stream_poller.py
You must first install PyZMQ’s IOLoop.
import zmq
import time
import sys
import random
from multiprocessing import Process
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
We have left the command server and the topic publisher same as before.
def server_push(port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:%s" % port)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
if reqnum < 6:
socket.send("Continue")
else:
socket.send("Exit")
break
time.sleep (1)
def server_pub(port="5558"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
publisher_id = random.randrange(0,9999)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
# Wait for next request from client
topic = random.randrange(8,10)
messagedata = "server#%s" % publisher_id
print "%s %s" % (topic, messagedata)
socket.send("%d %s" % (topic, messagedata))
time.sleep(1)
Message handlers are separated from the worker logic. Also note, that we stop the event loop once the worker receives the “Exit” command.
def getcommand(msg):
print "Received control command: %s" % msg
if msg[0] == "Exit":
print "Received exit command, client will stop receiving messages"
should_continue = False
ioloop.IOLoop.instance().stop()
def process_message(msg):
print "Processing ... %s" % msg
Here, you can see that we use ZMQStream class to register callbacks. The callbacks are the handlers that we had written earlier. The “If blocks” in previous program has been converted to callbacks registered with tornado event loop. There are no explicit socket handling blocks here.
def client(port_push, port_sub):
context = zmq.Context()
socket_pull = context.socket(zmq.PULL)
socket_pull.connect ("tcp://localhost:%s" % port_push)
stream_pull = zmqstream.ZMQStream(socket_pull)
stream_pull.on_recv(getcommand)
print "Connected to server with port %s" % port_push
socket_sub = context.socket(zmq.SUB)
socket_sub.connect ("tcp://localhost:%s" % port_sub)
socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
stream_sub = zmqstream.ZMQStream(socket_sub)
stream_sub.on_recv(process_message)
print "Connected to publisher with port %s" % port_sub
ioloop.IOLoop.instance().start()
print "Worker has stopped processing messages."
if __name__ == "__main__":
# Now we can run a few servers
server_push_port = "5556"
server_pub_port = "5558"
Process(target=server_push, args=(server_push_port,)).start()
Process(target=server_pub, args=(server_pub_port,)).start()
Process(target=client, args=(server_push_port,server_pub_port,)).start()
In the output, you should notice that client has exited prior to the publishers which keeps publishing without any subscribers to process these messages:
Running server on port: 5556
Running server on port: 5558
8 server#2028
Connected to server with port 5556
Connected to publisher with port 5558
Received control command: ['Continue']
9 server#2028
Processing ... ['9 server#2028']
Received control command: ['Continue']
8 server#2028
Received control command: ['Continue']
8 server#2028
Received control command: ['Continue']
8 server#2028
Received control command: ['Continue']
9 server#2028
Processing ... ['9 server#2028']
Received control command: ['Continue']
9 server#2028
Processing ... ['9 server#2028']
Received control command: ['Exit']
Received exit command, client will stop receiving messages
Worker has stopped processing messages.
8 server#2028
8 server#2028
9 server#2028