Advertisement
hivefans

kafka-sasl-thread-consumer.py

Jul 15th, 2021
1,122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.19 KB | None | 0 0
  1. import time
  2. from datetime import datetime
  3. from confluent_kafka import Consumer
  4. from threadpool import ThreadPool, makeRequests
  5.  
  6. class KafkaConsumerTool:
  7.     def __init__(self, broker, topic):
  8.         config = {
  9.             'bootstrap.servers': broker,
  10.             'session.timeout.ms': 30000,
  11.             'auto.offset.reset': 'earliest',
  12.             'api.version.request': False,
  13.             'broker.version.fallback': '2.6.0',
  14.             'group.id': 'mini-spider',
  15.             'security.protocol': 'SASL_PLAINTEXT',
  16.             'sasl.mechanisms': 'SCRAM-SHA-256',
  17.             'sasl.username': 'consumer',
  18.             'sasl.password': 'f29eded3'
  19.         }
  20.         self.consumer = Consumer(config)
  21.         self.topic = topic
  22.  
  23.     def receive_msg(self, x):
  24.         self.consumer.subscribe([self.topic])
  25.         print(datetime.now())
  26.         while True:
  27.             msg = self.consumer.poll(timeout=30.0)
  28.             print(msg)
  29.  
  30.  
  31. if __name__ == '__main__':
  32.     thread_num = 10
  33.     consumer = KafkaConsumerTool(broker, topic)
  34.     pool = ThreadPool(thread_num)
  35.     for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
  36.         pool.putRequest(r)
  37.     pool.wait()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement