Advertisement
pakuula

Пул недемонических процессов для pathos.multiprocessing

Feb 17th, 2025 (edited)
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.59 KB | Source Code | 0 0
  1. """Пример создания недемонических процессов в pathos.multiprocessing.ProcessPool"""
  2.  
  3. import logging
  4.  
  5. import multiprocess.context
  6.  
  7. import pathos.multiprocessing as mp
  8.  
  9. _clazz_cache = {}
  10.  
  11.  
  12. class NoDaemonMixin:
  13.     """Меняет свойство daemon на константу False"""
  14.  
  15.     def _get_daemon(self):
  16.         logging.getLogger("nodaemon.class").debug("NoDaemon._get_daemon()")
  17.         return False
  18.  
  19.     def _set_daemon(self, value):
  20.         logging.getLogger("nodaemon.class").debug(
  21.             "NoDaemon._set_daemon(%s) - ignored", value
  22.         )
  23.  
  24.     daemon = property(_get_daemon, _set_daemon)
  25.  
  26.  
  27. if hasattr(multiprocess.context, "SpawnProcess"):
  28.  
  29.     class NoDaemonSpawnProcess(NoDaemonMixin, multiprocess.context.SpawnProcess):
  30.         """Недемоническая обёртка вокруг `multiprocess.context.SpawnProcess`"""
  31.  
  32.         pass  # pylint: disable=unnecessary-pass
  33.  
  34.     _clazz_cache[multiprocess.context.SpawnProcess] = NoDaemonSpawnProcess
  35.  
  36. if hasattr(multiprocess.context, "ForkProcess"):
  37.  
  38.     class NoDaemonForkProcess(NoDaemonMixin, multiprocess.context.ForkProcess):
  39.         """Недемоническая обёртка вокруг `multiprocess.context.ForkProcess`"""
  40.  
  41.         pass  # pylint: disable=unnecessary-pass
  42.  
  43.     _clazz_cache[multiprocess.context.ForkProcess] = NoDaemonForkProcess
  44.  
  45. if hasattr(multiprocess.context, "ForkServerProcess"):
  46.  
  47.     class NoDaemonForkServerProcess(
  48.         NoDaemonMixin, multiprocess.context.ForkServerProcess
  49.     ):
  50.         """Недемоническая обёртка вокруг `multiprocess.context.ForkServerProcess`"""
  51.  
  52.         pass  # pylint: disable=unnecessary-pass
  53.  
  54.     _clazz_cache[multiprocess.context.ForkServerProcess] = NoDaemonForkServerProcess
  55.  
  56. from multiprocess.pool import (
  57.     Pool as PythonPool,
  58. )  # pylint: disable=wrong-import-position, wrong-import-order
  59.  
  60. DefaultProcessFactory = PythonPool.Process
  61.  
  62.  
  63. def _process(_, ctx, *args, **kwargs):
  64.     """Фабрика недемонических процессов."""
  65.     log = logging.getLogger("nodaemon.factory")
  66.     log.debug("MyPool.Process(): ctx=%s, args=%s, kwargs=%s", ctx, args, kwargs)
  67.     if ctx.Process in _clazz_cache:
  68.         wrap_clz = _clazz_cache[ctx.Process]
  69.     else:
  70.         raise TypeError(f"Unsupported process class: {ctx.Process}")
  71.  
  72.     try:
  73.         log.debug("Creating process instance: process class %s", wrap_clz)
  74.         return wrap_clz(*args, **kwargs)
  75.     finally:
  76.         log.debug("MyPool.Process() done")
  77.  
  78.  
  79. class MyProcessingPool(mp.ProcessingPool):
  80.     """Пул с недемоническими процессами.
  81.  
  82.    Перегружает метод `_serve` для установки фабрики недемонических процессов."""
  83.  
  84.     def _serve(self, nodes=None):
  85.         log = logging.getLogger("nodaemon.pool")
  86.  
  87.         log.debug("MyProcessingPool._serve(nodes=%s)", nodes)
  88.  
  89.         restore = PythonPool.Process is not _process
  90.         PythonPool.Process = _process
  91.         try:
  92.             return super(MyProcessingPool, self)._serve(nodes)
  93.         finally:
  94.             if restore:
  95.                 log.debug("Restoring PythonPool.Process factory")
  96.                 PythonPool.Process = DefaultProcessFactory
  97.  
  98.     def restart(self, force = False):
  99.         log = logging.getLogger("nodaemon.pool")
  100.  
  101.         log.debug("MyProcessingPool.restart(force=%s)", force)
  102.  
  103.         restore = PythonPool.Process is not _process
  104.         PythonPool.Process = _process
  105.         try:
  106.             return super().restart(force=force)
  107.         finally:
  108.             if restore:
  109.                 log.debug("Restoring PythonPool.Process factory")
  110.                 PythonPool.Process = DefaultProcessFactory
  111.  
  112. def sub_worker(x):
  113.     """Работник для вложенного пула."""
  114.     print(f"sub_worker({x=})")
  115.     return x * 2
  116.  
  117.  
  118. def worker(max_num):
  119.     """Работник для основного пула."""
  120.     print(f"worker({max_num=})")
  121.  
  122.     sub_pool = MyProcessingPool()
  123.     try:
  124.         return sub_pool.map(sub_worker, range(max_num))
  125.     finally:
  126.         sub_pool.close()
  127.         sub_pool.join()
  128.  
  129.  
  130. def main():
  131.     """Тестовый пример для пула с недемоническими процессами."""
  132.     # logging.basicConfig(level=logging.DEBUG)
  133.     # logging.getLogger("nodaemon").setLevel(logging.INFO)
  134.     pool = MyProcessingPool()
  135.     result = pool.map(worker, [1, 2, 3, 4])
  136.     print(result)
  137.     pool.close()
  138.     pool.join()
  139.  
  140.  
  141. if __name__ == "__main__":
  142.     main()
  143.  
Tags: python
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement