Kafka与Spark Streaming集成(0.8.2.1+1.3.1) 2015-07-02 21:00

介绍

创建一个Kafka Topic。tail命令读取日志文件的内容,然后通过Linux命令行管道送到Topic中。Spark Streaming作为Topic的消费者接收消息,进行处理。

Kakfa操作

Kakfa以集群模式安装。使用的Zookeeper集群为data01:2181/data02:2181/data03:2181。

  • 创建Topic

Topic创建后,永久有效。即使重启Kafka仍有效。

在任意一台机器上运行:

bin/kafka-topics.sh --create --zookeeper data01:2181 --replication-factor 2 --partitions 1 --topic mytopic1
  • 测试生产数据

读取nginx的log,并发送到Kafka:

tail -n 0 -f /opt/app/blog/log/nginx_access.log | bin/kafka-console-producer.sh --broker-list data01:9092 --topic mytopic1
  • 测试消费数据

先测试一下,使用Kakfa本身能否消费数据。如果可以,再改用SparkStreaming来消费:

bin/kafka-console-consumer.sh --zookeeper data01:2181 --topic mytopic1 --from-beginning

Spark Streaming

  • 代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        ## print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic, file = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    lines.saveAsTextFile(file)
    lines.pprint()
    #counts = lines.flatMap(lambda line: line.split(" ")) \
    #    .map(lambda word: (word, 1)) \
    #    .reduceByKey(lambda a, b: a+b)
    #counts.pprint()

    ssc.start()
    ssc.awaitTermination()
  • 运行Spark Streaming
spark-submit \
  --class "KafkaStreaming" \
  --master yarn-cluster \
  ./kafka.py data01:2181 mytopic1 hdfs://myns1/spark/stream1

参考文档

  1. spark读取 kafka nginx网站日志消息并写入HDFS中
  2. Kafka+SparkStreaming代码样例(Python)
Tags: #Kafka #SparkStreaming    Post on Streaming