Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import multiprocessing
- import os
- import time
- import random
- import functools
- # This program is a toy example of a fanout chain where the output of one
- # function is the input of the next.
- #
- # FuncA --> (multiple) FuncB's --> (multiple) FuncC's --> (multiple) FuncD's
- #
- # It's particularly interesting because the number of tasks is not known in
- # advance. Each function can generate a random number of results. Functions B,
- # C, and D are all parallelized and can run at the same time. A,B,and C create
- # results, and D is a terminal function that prints the results.
- #
- # The program should run until all tasks are complete, but I'm not sure how to
- # determine that efficiently.
- # Global settings
- WORKERS = 4
- MIN_RESULTS_PER_FUNCTION = 2
- MAX_RESULTS_PER_FUNCTION = 8
- MINIMUM_WORK_TIME = 0.05
- MAXIMUM_WORK_TIME = 1.00
- class TaskCounter():
- ''' Helper class to track the tasks running in parallel '''
- MANAGER = multiprocessing.Manager()
- TASKS = MANAGER.dict()
- LOCK = MANAGER.Lock()
- @staticmethod
- def track_task(func):
- @functools.wraps(func)
- def wrapped(*args, **kwargs):
- with TaskCounter.LOCK:
- TaskCounter.TASKS[func.__name__] = TaskCounter.TASKS.get(func.__name__, 0) + 1
- retval = func(*args, **kwargs)
- with TaskCounter.LOCK:
- TaskCounter.TASKS[func.__name__] = TaskCounter.TASKS.get(func.__name__, 0) - 1
- return retval
- return wrapped
- @staticmethod
- def tasks_running_string():
- with TaskCounter.LOCK:
- return [f"{key}: {value}" for key, value in TaskCounter.TASKS.items()]
- @staticmethod
- def all_tasks_complete() -> bool:
- with TaskCounter.LOCK:
- return all([v == 0 for v in TaskCounter.TASKS.values()])
- def timer(func):
- """Print the runtime of the decorated function"""
- @functools.wraps(func)
- def wrapper_timer(*args, **kwargs):
- start_time = time.perf_counter()
- value = func(*args, **kwargs)
- end_time = time.perf_counter()
- run_time = end_time - start_time
- print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
- return value
- return wrapper_timer
- def do_work():
- # simulate a long operation and return the last digits of the process ID
- time.sleep(random.uniform(MINIMUM_WORK_TIME, MAXIMUM_WORK_TIME))
- return str(os.getpid())[-2:]
- @TaskCounter.track_task
- def funcA(queue):
- print(f"funcA: {TaskCounter.tasks_running_string()}")
- number_of_results_to_generate = random.randint(MIN_RESULTS_PER_FUNCTION, MAX_RESULTS_PER_FUNCTION)
- for _ in range(number_of_results_to_generate):
- result = do_work()
- queue.put(("B", result))
- @TaskCounter.track_task
- def funcB(history, queue):
- print(f"funcB: {TaskCounter.tasks_running_string()}")
- number_of_results_to_generate = random.randint(MIN_RESULTS_PER_FUNCTION, MAX_RESULTS_PER_FUNCTION)
- for _ in range(number_of_results_to_generate):
- result = do_work()
- queue.put(("C", f"{history} -> {result}"))
- @TaskCounter.track_task
- def funcC(history, queue):
- print(f"funcC: {TaskCounter.tasks_running_string()}")
- number_of_results_to_generate = random.randint(MIN_RESULTS_PER_FUNCTION, MAX_RESULTS_PER_FUNCTION)
- for _ in range(number_of_results_to_generate):
- result = do_work()
- queue.put(("D", f"{history} -> {result}"))
- def funcD(history):
- print(f"funcD: {TaskCounter.tasks_running_string()} -- Final path: {history}")
- # ---- This is the part where I'm stuck. Is there a better way? ----
- def end_condition(q):
- ''' Try to determine if all tasks are complete. '''
- if q.empty() and TaskCounter.all_tasks_complete():
- print(f"Consumer #{os.getpid()}: End condition met once...")
- # unecessary sleep? give the processes enough time to finish. There could be a race condition
- # between the queue being populated and the task counter being updated.
- time.sleep(MAXIMUM_WORK_TIME)
- return q.empty() and TaskCounter.all_tasks_complete()
- else:
- return False
- # -----------------------------------------
- def shutdown(q):
- ''' Send a poison pill to all workers '''
- for _ in range(WORKERS):
- q.put(None)
- def worker(q):
- print(f"Consumer #{os.getpid()}: Alive")
- while True:
- item = q.get(block=True)
- if item is None:
- break
- try:
- if item[0] == "A":
- funcA(q)
- elif item[0] == "B":
- funcB(item[1], q)
- elif item[0] == "C":
- funcC(item[1], q)
- elif item[0] == "D":
- funcD(item[1])
- else:
- raise Exception("Unknown item type")
- except Exception as e:
- print(e)
- finally:
- # ---- This is the part where I'm stuck ----
- # Each worker needs to check if all tasks are complete
- # But the end conditions are not clear to me
- if end_condition(q):
- print(f"Consumer #{os.getpid()}: All tasks complete")
- shutdown(q)
- print(f"Consumer #{os.getpid()}: Exiting")
- def dump_queue(q):
- final = []
- while not q.empty():
- final.append(q.get())
- print(f"Final queue: {final}")
- @timer
- def main():
- queue = multiprocessing.Queue()
- queue.put(("A", None)) # load the first call into the queue
- pool = multiprocessing.Pool(processes=WORKERS,
- initializer=worker,
- initargs=(queue,))
- # prevent adding anything more to the worker pool
- # and wait for all workers to finish
- pool.close()
- pool.join()
- # See if there's anything left in the queue
- dump_queue(queue)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement