1、官方网站也有配置:
https://flume.apache.org/FlumeUserGuide.html#kafka-source
2、clodera 官方配置
https://www.cloudera.com/documentation/kafka/2-0-x/topics/kafka_flume.html
1\
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = zk01.example.com:
tier1.sources.source1.topic = weblogs
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity =
tier1.channels.channel1.transactionCapacity = tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval =
tier1.sinks.sink1.hdfs.rollSize =
tier1.sinks.sink1.hdfs.rollCount =
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
2\
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1 tier1.sources.source1.type = exec
tier1.sources.source1.command = /usr/bin/vmstat
tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity =
tier1.channels.channel1.transactionCapacity = tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = sink1
tier1.sinks.sink1.brokerList = kafka01.example.com:,kafka02.example.com:
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize =
3\
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1tier1.sources.source1.type = exec
tier1.sources.source1.command = /usr/bin/vmstat
tier1.sources.source1.channels = channel1tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.capacity =
tier1.channels.channel1.transactionCapacity =
tier1.channels.channel1.brokerList = kafka02.example.com:,kafka03.example.com:
tier1.channels.channel1.topic = channel2
tier1.channels.channel1.zookeeperConnect = zk01.example.com:
tier1.channels.channel1.parseAsFlumeEvent = truetier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel
tier1.sinks.sink1.hdfs.rollInterval =
tier1.sinks.sink1.hdfs.rollSize =
tier1.sinks.sink1.hdfs.rollCount =
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
总结:
这样配置以后我们可以通过监控,或者消费kafka console打印效果
/bin/kafka-console-consumer.sh –bootstrap-server master:9092 –topic topicTest2 –from-beginning