Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from random import random
- import time
- import threading
- import multiprocessing
- import queue
- class Sentinel:
- def __init__(self, name):
- self._name = name
- def __eq__(self, other):
- if not hasattr(other, '_name'):
- return False
- return self._name == other._name
- def __repr__(self):
- return f'{self._name} sentinel'
- END_THREAD = Sentinel('END_THREAD')
- END_PROCESS = Sentinel('END_PROCESS')
- FINISHED = Sentinel('FINISHED')
- def random_sleep():
- t = random()
- time.sleep(t)
- def thread_worker(in_q, out_q):
- for element in iter(in_q.get, END_THREAD):
- random_sleep()
- print(f'Thread Worker put: {element}')
- out_q.put(element)
- print('Thread worker finished')
- out_q.put(FINISHED)
- def process_worker(in_q, out_q):
- for element in iter(in_q.get, END_PROCESS):
- random_sleep()
- print(f'Process Worker put: {element}')
- out_q.put(element)
- print('Process worker finished')
- out_q.put(FINISHED)
- def process_results(in_q):
- for element in iter(in_q.get, FINISHED):
- random_sleep()
- print('Processing results', element)
- print('Processing results finished')
- thread_q = queue.Queue()
- process_q = multiprocessing.Queue()
- thread_results_q = queue.Queue()
- process_results_q = multiprocessing.Queue()
- # start all threads
- threading.Thread(target=thread_worker, args=[thread_q, thread_results_q]).start()
- multiprocessing.Process(target=process_worker, args=[process_q, process_results_q]).start()
- # start two consumers, one as process one as thread
- multiprocessing.Process(target=process_results, args=[process_results_q]).start()
- threading.Thread(target=process_results, args=[thread_results_q]).start()
- # put tasks in the queue
- [thread_q.put(f'Thread Task {n}') for n in range(1,6)]
- [process_q.put(f'Process Task {n}') for n in range(10,16)]
- # thread_q.put(END_PROCESS) # wrong sentinel
- # process_q.put(END_THREAD) # wrong sentinel
- # put stop sentinels into the worker queues
- thread_q.put(END_THREAD) # right sentinel
- process_q.put(END_PROCESS) # right sentinel
- # they emit a FINISHED sentinel to the consumer threads
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement