Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # 定义这个agent组件名称
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # =======================使用内置kafka source
- a1.sources.r1.kafka.bootstrap.servers=ip:9092
- a1.sources.r1.kafka.topics=kafka0-10-0
- a1.sources.r1.kafka.consumer.security.protocol=SASL_PLAINTEXT
- a1.sources.r1.kafka.consumer.sasl.mechanism=PLAIN
- a1.sources.r1.kafka.consumer.group.id=groupid
- a1.sources.r1.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";
- a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
- a1.sources.r1.batchSize=5000
- # =======================对sources进行拦截器操作 channel中会带有Header,我做了remove_header操作
- a1.sources.r1.interceptors=i1
- a1.sources.r1.interceptors.i1.type=remove_header
- a1.sources.r1.interceptors.i1.fromList=timestamp,topic,offset,partition
- a1.sources.r1.channels=c1
- # channel设置 a1.sinks.k1.type有很多种,如、memory、file、jdbc、kafka 我使用kafka做为通道
- # channel 先把数据发动到kafka缓存通道,处理完成sink接收,之后进行producer
- a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers=127.0.0.1:9002
- a1.channels.c1.kafka.topic=kafka-channel
- # =======================目标生产数据
- #a1.sinks.k1.type=logger 打开这里可以验证从channel传递过来的数据是否是你想要的
- a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.kafka.topic=kafka2-1
- a1.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9002
- a1.sinks.k1.kafka.flumeBatchSize= 1000
- a1.sinks.k1.kafka.producer.acks = 1
- a1.sinks.k1.kafka.producer.compression.type = snappy
- # Bind the source and sink to the channel
- a1.sources.r1.channels=c1
- a1.sinks.k1.channel=c1
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement