Advertisement
daiman

Faststream

Apr 24th, 2025
228
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.03 KB | Software | 0 0
  1. from faststream import FastStream, Logger
  2. from fastapi.concurrency import run_in_threadpool
  3. from faststream.rabbit import RabbitBroker, RabbitExchange, RabbitQueue
  4.  
  5. from ..common.app import App
  6. from ..common.settings import rabbit_settings
  7.  
  8.  
  9. broker = RabbitBroker(
  10.     url=rabbit_settings.RABBITMQ_DEFAULT_URL.unicode_string(),
  11. )
  12. consumer_app = FastStream(broker=broker)
  13.  
  14. tasks_exch = RabbitExchange(
  15.     name="",
  16.     auto_delete=True,
  17. )
  18.  
  19. queue = RabbitQueue(
  20.     name="tasks",
  21.     durable=True,
  22.     passive=True,
  23.     auto_delete=True
  24. )
  25.  
  26. def executor(
  27.     data: dict,
  28.     logger: Logger
  29. ):
  30.     app = App.get()
  31.     result = app.async_task_manager.execute_task(
  32.         async_task_id=data["async_task"]["id"],
  33.     )
  34.     logger.info(f"Task {result.id} executed with status {result.status}")
  35.  
  36.  
  37. @broker.subscriber(
  38.     queue=queue,
  39.     exchange=tasks_exch,
  40. )
  41. async def handle_message(
  42.     data: dict,
  43.     logger: Logger,
  44. ):
  45.     await run_in_threadpool(
  46.         executor,
  47.         data,
  48.         logger,
  49.     )
  50.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement