Streamer

Streamer device

is a device for parallelized pipeline messaging. Acts as a broker that callects tasks from task feeders and supplies them to task workers.

../../_images/streamer.png

streamer_device.py

import zmq

def main():

    try:
        context = zmq.Context(1)
        # Socket facing clients
        frontend = context.socket(zmq.PULL)
        frontend.bind("tcp://*:5559")
        
        # Socket facing services
        backend = context.socket(zmq.PUSH)
        backend.bind("tcp://*:5560")

        zmq.device(zmq.STREAMER, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

task_feeder.py

import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.connect("tcp://127.0.0.1:5559")
    # Start your result manager and workers before you start your producers
    for num in xrange(20000):
        work_message = { 'num' : num }
        zmq_socket.send_json(work_message)
        time.sleep(1)

producer()


task_worker.py

import sys
import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print "I am consumer #%s" % (consumer_id)
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5560")
    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = { 'consumer' : consumer_id, 'num' : data}
        print result
consumer()

Execute these programs on separate shells:

python streamer_device.py
python task_feeder.py
python task_worker.py
python task_worker.py

Output on one of the workers:

I am consumer #8113
{'num': 1, 'consumer': 8113}
{'num': 3, 'consumer': 8113}
{'num': 5, 'consumer': 8113}
{'num': 7, 'consumer': 8113}
{'num': 9, 'consumer': 8113}
{'num': 11, 'consumer': 8113}