Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # pip install confluent-kafka
- from confluent_kafka import Consumer, KafkaError
- topic = 'CLIENT_NAME-tests'
- c = Consumer({
- 'bootstrap.servers': 'server.name:9094',
- 'group.id': 'mygroup',
- # set to False to make sure you do not update offsets
- 'enable.auto.commit': False,
- #'enable.auto.commit': True,
- # set to earliest to start --from-beginning
- 'auto.offset.reset': 'earliest',
- 'security.protocol': 'SASL_SSL',
- 'ssl.ca.location': 'ca-cert',
- 'sasl.mechanisms': 'PLAIN',
- 'sasl.username': 'CLIENT_USERNAME',
- 'sasl.password': 'XXXX',
- })
- c.subscribe([topic])
- n = 0
- while True:
- msg = c.poll(0)
- if msg is None:
- continue
- if msg.error():
- print("Consumer error: {}".format(msg.error))
- continue
- n += 1
- print("message: {} | {}".format(n, msg.value()))
- c.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement