Streamer

Here we will use the ProcessDevice to create a STREAMER device for pipelining server and workers.

streamerdevice.py

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process

frontend_port = 5559
backend_port = 5560
number_of_workers = 2

The key difference here is that while zmq.device take Socket objects as arguments, zmq.devices.basedevice.ProcessDevice takes socket types.

streamerdevice  = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)

For each configuration method (bind/connect/setsockopt), the proxy methods are prefixed with “in_” or “out_” corresponding to the frontend and backend sockets.

streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, 'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, 'PUSH')

Finally, you can start the device in background.

streamerdevice.start()

Server and workers in the pipeline have been kept relatively simple for illustration purposes.


def server():
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%d" % frontend_port)

    for i in xrange(0,10):
        socket.send('#%s' % i)

def worker(work_num):
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%d" % backend_port)
    
    while True:
        message = socket.recv()
        print "Worker #%s got message! %s" % (work_num, message)
        time.sleep(1)

for work_num in range(number_of_workers):
    Process(target=worker, args=(work_num,)).start()
time.sleep(1)

server()

The requests are farmed out to workers in load balanced manner:

Worker #1 got message! #0
Worker #0 got message! #1
Worker #1 got message! #2
Worker #0 got message! #3
Worker #1 got message! #4
Worker #0 got message! #5
Worker #1 got message! #6
Worker #0 got message! #7
Worker #1 got message! #8
Worker #0 got message! #9