Streamer¶
Streamer device
is a device for parallelized pipeline messaging. Acts as a broker that callects tasks from task feeders and supplies them to task workers.
streamer_device.py
import zmq
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.PULL)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.PUSH)
backend.bind("tcp://*:5560")
zmq.device(zmq.STREAMER, frontend, backend)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
frontend.close()
backend.close()
context.term()
if __name__ == "__main__":
main()
task_feeder.py
import time
import zmq
def producer():
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.connect("tcp://127.0.0.1:5559")
# Start your result manager and workers before you start your producers
for num in xrange(20000):
work_message = { 'num' : num }
zmq_socket.send_json(work_message)
time.sleep(1)
producer()
task_worker.py
import sys
import time
import zmq
import random
def consumer():
consumer_id = random.randrange(1,10005)
print "I am consumer #%s" % (consumer_id)
context = zmq.Context()
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5560")
while True:
work = consumer_receiver.recv_json()
data = work['num']
result = { 'consumer' : consumer_id, 'num' : data}
print result
consumer()
Execute these programs on separate shells:
python streamer_device.py
python task_feeder.py
python task_worker.py
python task_worker.py
Output on one of the workers:
I am consumer #8113
{'num': 1, 'consumer': 8113}
{'num': 3, 'consumer': 8113}
{'num': 5, 'consumer': 8113}
{'num': 7, 'consumer': 8113}
{'num': 9, 'consumer': 8113}
{'num': 11, 'consumer': 8113}