Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Пример создания недемонических процессов в pathos.multiprocessing.ProcessPool"""
- import logging
- import multiprocess.context
- import pathos.multiprocessing as mp
- _clazz_cache = {}
- class NoDaemonMixin:
- """Меняет свойство daemon на константу False"""
- def _get_daemon(self):
- logging.getLogger("nodaemon.class").debug("NoDaemon._get_daemon()")
- return False
- def _set_daemon(self, value):
- logging.getLogger("nodaemon.class").debug(
- "NoDaemon._set_daemon(%s) - ignored", value
- )
- daemon = property(_get_daemon, _set_daemon)
- if hasattr(multiprocess.context, "SpawnProcess"):
- class NoDaemonSpawnProcess(NoDaemonMixin, multiprocess.context.SpawnProcess):
- """Недемоническая обёртка вокруг `multiprocess.context.SpawnProcess`"""
- pass # pylint: disable=unnecessary-pass
- _clazz_cache[multiprocess.context.SpawnProcess] = NoDaemonSpawnProcess
- if hasattr(multiprocess.context, "ForkProcess"):
- class NoDaemonForkProcess(NoDaemonMixin, multiprocess.context.ForkProcess):
- """Недемоническая обёртка вокруг `multiprocess.context.ForkProcess`"""
- pass # pylint: disable=unnecessary-pass
- _clazz_cache[multiprocess.context.ForkProcess] = NoDaemonForkProcess
- if hasattr(multiprocess.context, "ForkServerProcess"):
- class NoDaemonForkServerProcess(
- NoDaemonMixin, multiprocess.context.ForkServerProcess
- ):
- """Недемоническая обёртка вокруг `multiprocess.context.ForkServerProcess`"""
- pass # pylint: disable=unnecessary-pass
- _clazz_cache[multiprocess.context.ForkServerProcess] = NoDaemonForkServerProcess
- from multiprocess.pool import (
- Pool as PythonPool,
- ) # pylint: disable=wrong-import-position, wrong-import-order
- DefaultProcessFactory = PythonPool.Process
- def _process(_, ctx, *args, **kwargs):
- """Фабрика недемонических процессов."""
- log = logging.getLogger("nodaemon.factory")
- log.debug("MyPool.Process(): ctx=%s, args=%s, kwargs=%s", ctx, args, kwargs)
- if ctx.Process in _clazz_cache:
- wrap_clz = _clazz_cache[ctx.Process]
- else:
- raise TypeError(f"Unsupported process class: {ctx.Process}")
- try:
- log.debug("Creating process instance: process class %s", wrap_clz)
- return wrap_clz(*args, **kwargs)
- finally:
- log.debug("MyPool.Process() done")
- class MyProcessingPool(mp.ProcessingPool):
- """Пул с недемоническими процессами.
- Перегружает метод `_serve` для установки фабрики недемонических процессов."""
- def _serve(self, nodes=None):
- log = logging.getLogger("nodaemon.pool")
- log.debug("MyProcessingPool._serve(nodes=%s)", nodes)
- restore = PythonPool.Process is not _process
- PythonPool.Process = _process
- try:
- return super(MyProcessingPool, self)._serve(nodes)
- finally:
- if restore:
- log.debug("Restoring PythonPool.Process factory")
- PythonPool.Process = DefaultProcessFactory
- def restart(self, force = False):
- log = logging.getLogger("nodaemon.pool")
- log.debug("MyProcessingPool.restart(force=%s)", force)
- restore = PythonPool.Process is not _process
- PythonPool.Process = _process
- try:
- return super().restart(force=force)
- finally:
- if restore:
- log.debug("Restoring PythonPool.Process factory")
- PythonPool.Process = DefaultProcessFactory
- def sub_worker(x):
- """Работник для вложенного пула."""
- print(f"sub_worker({x=})")
- return x * 2
- def worker(max_num):
- """Работник для основного пула."""
- print(f"worker({max_num=})")
- sub_pool = MyProcessingPool()
- try:
- return sub_pool.map(sub_worker, range(max_num))
- finally:
- sub_pool.close()
- sub_pool.join()
- def main():
- """Тестовый пример для пула с недемоническими процессами."""
- # logging.basicConfig(level=logging.DEBUG)
- # logging.getLogger("nodaemon").setLevel(logging.INFO)
- pool = MyProcessingPool()
- result = pool.map(worker, [1, 2, 3, 4])
- print(result)
- pool.close()
- pool.join()
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement