Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class TaskManager:
- def __init__(self, consumer):
- # Initialize the TaskManager with a Kafka consumer
- self.consumer = consumer
- self.message_vars = {} # Placeholder for message variables
- self.message_templates = {} # Placeholder for message templates
- self.message_types = {} # Placeholder for message types
- async def run(self, context):
- """Run method to continuously process messages from the Kafka consumer.
- Args:
- context (dict): Context information for message processing.
- """
- from aiokafka import TopicPartition
- # Start the Kafka consumer
- await self.consumer.start()
- print(f"{' Start consuming ':=^50}")
- # Continuously process messages
- while True:
- # Get the next message from the consumer
- message = await self.consumer.getone()
- # Create a TopicPartition object for the message
- tp = TopicPartition(message.topic, message.partition)
- print(f"{' Start consuming ':=^50}")
- try:
- # Check if the message has no value
- if not message.value:
- # Commit the offset and continue to the next message
- await self.consumer.commit({tp: message.offset + 1})
- continue
- # Create an EventSourcing instance and start processing the event
- event_sourcing = EventSourcing()
- await event_sourcing.start_event(json.loads(message.value.decode("utf-8")))
- # Commit the offset after successfully processing the message
- await self.consumer.commit({tp: message.offset + 1})
- except Exception as ex:
- # Print any exceptions that occur during message processing
- print(ex)
Add Comment
Please, Sign In to add comment