yiorgos

confluent-kafka-producer

Mar 26th, 2021 (edited)
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.03 KB | None | 0 0
  1. # Simple Kafka Producer, using confluent-kafka-python (librdkafka)
  2.  
  3. # Sample dockerfile to run this producer
  4. #
  5. # FROM python:3
  6. # WORKDIR /usr/src/app
  7. # RUN pip install --no-cache-dir  confluent-kafka==1.0.0
  8. # COPY ca-cert .
  9. # COPY rd.py .
  10. # CMD ["python", "./rd.py"]
  11.  
  12. from confluent_kafka import Producer
  13.  
  14. def delivery_report(err, msg):
  15.   if err is not None:
  16.     print("Message delivery failed: {}".format(err))
  17.   else:
  18.     print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
  19.  
  20. topic = 'CLIENT_NAME-tests'
  21.  
  22. p = Producer({
  23.   'bootstrap.servers': 'server.name:9094',
  24.   'security.protocol': 'SASL_SSL',
  25.   'ssl.ca.location': 'ca-cert',
  26.   'sasl.mechanisms': 'PLAIN',
  27.   'sasl.username': 'USERNAME',
  28.   'sasl.password': 'XXX_YYY_ZZZ',
  29.   'acks': 1,
  30.   'compression.type': 'lz4',
  31.   })
  32.  
  33. for i in range(1000000):
  34.   message = "Event {}".format(i)
  35.   p.produce(topic, message, on_delivery=delivery_report)
  36.   print("{} Queue len: {}".format(message, len(p)))
  37.   if len(p) > 90000:
  38.     p.poll(0)
  39.  
  40. p.flush()
  41.  
Add Comment
Please, Sign In to add comment