Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from faststream import FastStream, Logger
- from fastapi.concurrency import run_in_threadpool
- from faststream.rabbit import RabbitBroker, RabbitExchange, RabbitQueue
- from ..common.app import App
- from ..common.settings import rabbit_settings
- broker = RabbitBroker(
- url=rabbit_settings.RABBITMQ_DEFAULT_URL.unicode_string(),
- )
- consumer_app = FastStream(broker=broker)
- tasks_exch = RabbitExchange(
- name="",
- auto_delete=True,
- )
- queue = RabbitQueue(
- name="tasks",
- durable=True,
- passive=True,
- auto_delete=True
- )
- def executor(
- data: dict,
- logger: Logger
- ):
- app = App.get()
- result = app.async_task_manager.execute_task(
- async_task_id=data["async_task"]["id"],
- )
- logger.info(f"Task {result.id} executed with status {result.status}")
- @broker.subscriber(
- queue=queue,
- exchange=tasks_exch,
- )
- async def handle_message(
- data: dict,
- logger: Logger,
- ):
- await run_in_threadpool(
- executor,
- data,
- logger,
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement