Mochinov

Untitled

Feb 2nd, 2024
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.87 KB | None | 0 0
  1.  
  2. class TaskManager:
  3. def __init__(self, consumer):
  4. # Initialize the TaskManager with a Kafka consumer
  5. self.consumer = consumer
  6. self.message_vars = {} # Placeholder for message variables
  7. self.message_templates = {} # Placeholder for message templates
  8. self.message_types = {} # Placeholder for message types
  9.  
  10. async def run(self, context):
  11. """Run method to continuously process messages from the Kafka consumer.
  12.  
  13. Args:
  14. context (dict): Context information for message processing.
  15. """
  16. from aiokafka import TopicPartition
  17.  
  18. # Start the Kafka consumer
  19. await self.consumer.start()
  20.  
  21. print(f"{' Start consuming ':=^50}")
  22. # Continuously process messages
  23. while True:
  24. # Get the next message from the consumer
  25. message = await self.consumer.getone()
  26.  
  27. # Create a TopicPartition object for the message
  28. tp = TopicPartition(message.topic, message.partition)
  29.  
  30. print(f"{' Start consuming ':=^50}")
  31. try:
  32. # Check if the message has no value
  33. if not message.value:
  34. # Commit the offset and continue to the next message
  35. await self.consumer.commit({tp: message.offset + 1})
  36. continue
  37.  
  38. # Create an EventSourcing instance and start processing the event
  39. event_sourcing = EventSourcing()
  40. await event_sourcing.start_event(json.loads(message.value.decode("utf-8")))
  41.  
  42. # Commit the offset after successfully processing the message
  43. await self.consumer.commit({tp: message.offset + 1})
  44.  
  45. except Exception as ex:
  46. # Print any exceptions that occur during message processing
  47. print(ex)
  48.  
Add Comment
Please, Sign In to add comment