Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- from datetime import datetime
- from confluent_kafka import Consumer
- from threadpool import ThreadPool, makeRequests
- class KafkaConsumerTool:
- def __init__(self, broker, topic):
- config = {
- 'bootstrap.servers': broker,
- 'session.timeout.ms': 30000,
- 'auto.offset.reset': 'earliest',
- 'api.version.request': False,
- 'broker.version.fallback': '2.6.0',
- 'group.id': 'mini-spider',
- 'security.protocol': 'SASL_PLAINTEXT',
- 'sasl.mechanisms': 'SCRAM-SHA-256',
- 'sasl.username': 'consumer',
- 'sasl.password': 'f29eded3'
- }
- self.consumer = Consumer(config)
- self.topic = topic
- def receive_msg(self, x):
- self.consumer.subscribe([self.topic])
- print(datetime.now())
- while True:
- msg = self.consumer.poll(timeout=30.0)
- print(msg)
- if __name__ == '__main__':
- thread_num = 10
- consumer = KafkaConsumerTool(broker, topic)
- pool = ThreadPool(thread_num)
- for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
- pool.putRequest(r)
- pool.wait()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement