Advertisement
DeaD_EyE

working sentinal example with Thread and Process

Aug 4th, 2019
303
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.19 KB | None | 0 0
  1. from random import random
  2. import time
  3. import threading
  4. import multiprocessing
  5. import queue
  6.  
  7.  
  8. class Sentinel:
  9.     def __init__(self, name):
  10.         self._name = name
  11.     def __eq__(self, other):
  12.         if not hasattr(other, '_name'):
  13.             return False
  14.         return self._name == other._name
  15.     def __repr__(self):
  16.         return f'{self._name} sentinel'
  17.  
  18.  
  19. END_THREAD = Sentinel('END_THREAD')
  20. END_PROCESS = Sentinel('END_PROCESS')
  21. FINISHED = Sentinel('FINISHED')
  22.  
  23.  
  24. def random_sleep():
  25.     t = random()
  26.     time.sleep(t)
  27.  
  28.  
  29. def thread_worker(in_q, out_q):
  30.     for element in iter(in_q.get, END_THREAD):
  31.         random_sleep()
  32.         print(f'Thread Worker put: {element}')
  33.         out_q.put(element)
  34.     print('Thread worker finished')
  35.     out_q.put(FINISHED)
  36.  
  37.  
  38. def process_worker(in_q, out_q):
  39.     for element in iter(in_q.get, END_PROCESS):
  40.         random_sleep()
  41.         print(f'Process Worker put: {element}')
  42.         out_q.put(element)
  43.     print('Process worker finished')
  44.     out_q.put(FINISHED)
  45.  
  46.  
  47. def process_results(in_q):
  48.     for element in iter(in_q.get, FINISHED):
  49.         random_sleep()
  50.         print('Processing results', element)
  51.     print('Processing results finished')
  52.  
  53.  
  54. thread_q = queue.Queue()
  55. process_q = multiprocessing.Queue()
  56. thread_results_q = queue.Queue()
  57. process_results_q = multiprocessing.Queue()
  58.  
  59. # start all threads
  60. threading.Thread(target=thread_worker, args=[thread_q, thread_results_q]).start()
  61. multiprocessing.Process(target=process_worker, args=[process_q, process_results_q]).start()
  62.  
  63. # start two consumers, one as process one as thread
  64. multiprocessing.Process(target=process_results, args=[process_results_q]).start()
  65. threading.Thread(target=process_results, args=[thread_results_q]).start()
  66.  
  67.  
  68. # put tasks in the queue
  69. [thread_q.put(f'Thread Task {n}') for n in range(1,6)]
  70. [process_q.put(f'Process Task {n}') for n in range(10,16)]
  71.  
  72. # thread_q.put(END_PROCESS) # wrong sentinel
  73. # process_q.put(END_THREAD) # wrong sentinel
  74.  
  75. # put stop sentinels into the worker queues
  76. thread_q.put(END_THREAD) # right sentinel
  77. process_q.put(END_PROCESS) # right sentinel
  78.  
  79. # they emit a FINISHED sentinel to the consumer threads
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement