Flume与Kafka集成(1.6.0+0.8.2.1) 2015-07-03 17:00

介绍

Flume从1.6.0开始,官方已经直接支持Kafka的sink了。这样就非常方便的可以将从Flume采集的数据,发送给Kafka。

Kakfa作为一个消息队列,可以将Flume采集的数据进行缓冲。在Flume与后端流处理(如Storm,Spark Streaming)中间增加Kafka的原因如下:

  1. 避免Flume的采集能力与的能力不匹配导致数据丢失。
  2. 可以让后端流处理重启时,数据不会丢失

Flume配置

以下例子为,Flume读取nginx的日志,然后发送给Kafka的Topic。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -f /opt/app/blog/log/nginx_access.log

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mytopic1
a1.sinks.k1.brokerList = data01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 运行Flume程序
bin/flume-ng agent -c conf/ -f conf/tail_kafka.conf -n a1 -Dflume.root.logger=INFO,console

Kafka端

Kafka的配置参考文档Kafka与Spark Streaming集成

参考文档

  1. Kafka Sink
Tags: #Flume #Kafka    Post on ETL