Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- from multiprocessing import Manager, Process
- def worker1(namespace, iterations):
- for i in range(iterations):
- namespace.worker1_var = i
- def worker2(namespace, iterations):
- for i in range(iterations):
- result = namespace.worker1_var * i
- print('Worker2:', result)
- print('Multiprocessing with namespace:')
- # Create a manager, which starts a new Python
- # Process to act as a manager between the other
- # processes
- manager = Manager()
- # Namespace is like a class, where you can assign or get
- # your variables
- ns = manager.Namespace()
- # Creating a new variable in the namespace
- ns.worker1_var = 0
- # create new processes and give the namesapce as argument to the function
- proc1 = Process(target=worker1, args=(ns, 10))
- proc2 = Process(target=worker2, args=(ns, 10))
- # start processes
- proc1.start()
- proc2.start()
- # wait until the processes are finished
- proc1.join()
- proc2.join()
- print('\nMultiprocessing with queues:')
- # queues are often the better for jobs
- # for configuration or something like global state
- # I prefer the namespace
- def generator(queue):
- for i in range(10):
- queue.put(i)
- def worker(queue):
- while True:
- print('Worker:' ,queue.get())
- # Now make a queue object, which is handled by manager
- queue = manager.Queue()
- # now the manager controls the Queue between the processes
- # everything important is handled like locking
- proc_gen = Process(target=generator, args=(queue,))
- proc_work = Process(target=worker, args=(queue,))
- proc_gen.start()
- proc_work.start()
- proc_gen.join()
- proc_work.terminate() # killing this endless process
- # --------------------------------------
- import threading
- import zmq
- print('\nZMQ Example in one process,\ncheating with threading to do this in one process.')
- # this is for example in first.py
- def zmq_publish(pub):
- for i in range(10):
- data = str(i).encode()
- #print('Data has been sent.')
- pub.send_multipart([b'velocity', data])
- # in second.py for example
- def printer(sub):
- while True:
- topic, data = sub.recv_multipart()
- value = int(data)
- print(topic.decode(), value)
- if value == 9:
- break
- context = zmq.Context()
- publisher = context.socket(zmq.PUB)
- subscriber = context.socket(zmq.SUB)
- publisher.bind('tcp://127.0.0.1:5090')
- subscriber.connect('tcp://127.0.0.1:5090')
- subscriber.subscribe(b'velocity')
- # give the subscriber a little bit time to connect
- # otherwise the data is already sent, before the
- # subscriber has been connected
- # and then this data is lost
- time.sleep(1)
- # this one the big benefits of zmq, because the order of connection
- # doesn't matter.
- # the publisher can already send, where there is no subsriber
- # and reversed
- # the connection is established magically in the background
- #
- # but you choose also the other concepts of zmq:
- # Publish -> Subscribe
- # Request <-> Reply
- # Push -> Pull
- thread = threading.Thread(target=zmq_publish, args=(publisher,))
- thread.start()
- # obviosly multiprocess.Process has the
- # same interface like threading.Thread
- # I'm using this only for demonstration
- # in one single process
- # This won't work with multiprocessing, because the context is
- # creates not inside the generator/worker process
- # with a thread it works because of shared memory
- printer(subscriber)
- # destroying the context
- # closes everything
- context.destroy()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement