Streamer¶
Here we will use the ProcessDevice to create a STREAMER device for pipelining server and workers.
streamerdevice.py
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
frontend_port = 5559
backend_port = 5560
number_of_workers = 2
The key difference here is that while zmq.device take Socket objects as arguments, zmq.devices.basedevice.ProcessDevice takes socket types.
streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
For each configuration method (bind/connect/setsockopt), the proxy methods are prefixed with “in_” or “out_” corresponding to the frontend and backend sockets.
streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, 'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, 'PUSH')
Finally, you can start the device in background.
streamerdevice.start()
Server and workers in the pipeline have been kept relatively simple for illustration purposes.
def server():
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%d" % frontend_port)
for i in xrange(0,10):
socket.send('#%s' % i)
def worker(work_num):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%d" % backend_port)
while True:
message = socket.recv()
print "Worker #%s got message! %s" % (work_num, message)
time.sleep(1)
for work_num in range(number_of_workers):
Process(target=worker, args=(work_num,)).start()
time.sleep(1)
server()
The requests are farmed out to workers in load balanced manner:
Worker #1 got message! #0
Worker #0 got message! #1
Worker #1 got message! #2
Worker #0 got message! #3
Worker #1 got message! #4
Worker #0 got message! #5
Worker #1 got message! #6
Worker #0 got message! #7
Worker #1 got message! #8
Worker #0 got message! #9