用Apache Kafka 和 Python 搭建分布式流处理系统

Author: Borg June 14, 2017

Title: 用Apache Kafka 和 Python 搭建分布式流处理系统 June 14, 2017

1323 Chinese characters, 320 English words.



注: 本篇翻译自 [scotch.io][1] 。 # Apache Kafka 是什么? Kafka 是一个开源的分布式流处理平台,其简化了不同数据系统的集成。流指的是一个数据管道,应用能够通过流不断地接收数据。Kafka 作为流处理系统主要有两个用处: 1. **数据集成**: Kafka 捕捉事件流或数据变化流,并将这些数据送给其它数据系统,如关系型数据库,键值对数据库或者数据仓库。 2. **流处理**:Kafka接收事件流并保存在一个只能追加的队列里,该队列称为日志(log)。日志里的信息是不可变的,因此支持连续实时的数据处理和流转换,并使结果在系统级别可访问。 相比于其它技术,Kafka 拥有更高的吞吐量,内置分区,副本和容错率。这些使得 Kafka 成为大规模消息处理应用的良好解决方案。 Kafka 系统有三个主要的部分: 1. 生产者(Producer): 产生原始数据的服务。 2. 中间人(Broker): Kafka 是生产者和消费者之间的中间人,它使用API来获取和发布数据。 3. 消费者(Consumer): 使用中间人发布的数据的服务。 访问 [Kafka 官网][2]获取更详细信息。 # 项目 我们将要搭建一个简单的流处理应用,其中生产者将把一个视频文件流传送给消费者,最后在浏览器页面上显示。该项目是为了展示Kafka的数据集成和流处理。 # 依赖 该项目介绍了 Kafka 和 消息处理。读者需要对 Python 有基本的了解。 - Python 基础知识 - Python 3 - Kafka - Virtualenv - pip # 安装 Kafka - Mac 安装: brew install kafka - 安装后启动: brew services start kafka - Linux [安装指南][3] Kafka 默认端口为 9092 # 配置环境 我们的项目将包括: 视频文件: 作为源数据,建议使用 5mb 以下的 mp4 文件。 简单的生产者,能将视频图像发送给 Kafka 消费者: 获取数据并展示在浏览器页面上 Kafka: 作为中间人 创建项目路径: $ mkdir kafka && cd kafka 创建虚拟环境并启用: $ virtualenv env && source env/bin/activate 安装需要的依赖包,我们将使用 Flask 和 opencv pip install kafka-python opencv-python Flask # 创建生产者 生产者是给 Kafka 中间人发送消息的服务。值得注意的是,生产者并不关注最终消费或加载数据的消费者。 创建生产者: 创建一个 producer.py 文件并添加如下代码: # producer.py import time import cv2 from kafka import SimpleProducer, KafkaClient # connect to Kafka kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) # Assign a topic topic = 'my-topic' 创建消息: 消息将由二进制的图片组成。 OpenCV 能够读取视频文件并转换成二进制。我们需要创建一个函数,用于在发送消息给 Kafka 前读取视频文件并转换成二进制。将视频文件放置在与生产者相同路径下。 发送消息: Kafka 消息是二进制字符串格式(byte),所以图像在发送前需要被编码。 以下是完整的生产者代码: # producer.py import time import cv2 from kafka import SimpleProducer, KafkaClient # connect to Kafka kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) # Assign a topic topic = 'my-topic' def video_emitter(video): # Open the video video = cv2.VideoCapture(video) print(' emitting.....') # read the file while (video.isOpened): # read the image in each frame success, image = video.read() # check if the file has read to the end if not success: break # convert the image png ret, jpeg = cv2.imencode('.png', image) # Convert the image to bytes and send to kafka producer.send_messages(topic, jpeg.tobytes()) # To reduce CPU usage create sleep time of 0.2sec time.sleep(0.2) # clear the capture video.release() print('done emitting') if __name__ == '__main__': video_emitter('video.mp4') 生产者就完成了。 # 创建消费者 消费者监听并消费来自 Kafka 中间人的消息。我们的消费者应该监听 my-topic 主题的消息并将消息展示。我们将使用 Flask 微框架来展示接收到的视频图像。 持续监听: 消费者将持续监听来自 Kafka 的消息更新和创建广播。我们将使用生成器来保持连接。生成器是用来生成结果序列而非单个结果的循环。由于图像是被序列地发送,我们的响应也将使用 `multipart/x-mixed-replace` mime type。 以下是消费者代码(consumer.py): from flask import Flask, Response from kafka import KafkaConsumer #connect to Kafka server and pass the topic we want to consume consumer = KafkaConsumer('my-topic', group_id='view' bootstrap_servers=['0.0.0.0:9092']) #Continuously listen to the connection and print messages as recieved app = Flask(__name__) @app.route('/') def index(): # return a multipart response return Response(kafkastream(), mimetype='multipart/x-mixed-replace; boundary=frame') def kafkastream(): for msg in consumer: yield (b'--frame\r\n' b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n') if __name__ == '__main__': app.run(host='0.0.0.0', debug=True) # 运行项目 确保 Kafka 在运行: `brew services start kafka` 打开两个终端,在第一个终端中运行生产者: (env)$ python producer.py ![enter image description here][4] 在第二个终端运行消费者: (env)$ python consumer.py ![enter image description here][5] 这将会运行我们的 Flask 服务器。接下来打开浏览器查看链 `http://0.0.0.0:5000` 。 ![enter image description here][6] # 观察结果 1. 刷新浏览器并不会重头开始播放视频。 Kafka 使用 offset 来记录消费者的日志读取位置。 2. 如果浏览器在播放视频的时候被关闭,下次再打开浏览器输入链接后,视频将会从断点继续播放。 3. 播放视频的时候不需要生产者在运行, Kafka 将保存消息,并当消费者需要的时候提供消息。 4. 当生产者和消费者同时运行的时候图像几乎是实时传送到消费者。 5. 视频处理是线性的。 6. 消息共享能够减少生产者需要发送图像的次数。 # 何处使用 Kafka? 1. 微框架: Kafka 是众多需要彼此持续异步通信的微服务间的最好管道。 2. 数据库: 为了防止将数据仓库中的数据整个导出,可以创建 Kafka 生产者和消费者用于保存和监测数据库发生的改变。 3. 数据收集处理:网站内置的生产者可以实时收集点击事件或访问量。 4. 传感器和其它硬件数据。 5. 证券报价机 # 总结 Kafka 是一个快速、可扩展、使用简单的分布式流处理系统。要使用 Kafka 需要知道: 生产者将发布消息给中间人的消息主题。 消费者需要监听中间人发布的消息主题。 我们创建了一个简单的流应用,展示了使用流数据的好处,如此使用如何快速以及 Kafka 如何被作为中间人。 希望你已经知道了如何使用 Kafka。 [1]: https://scotch.io/tutorials/build-a-distributed-streaming-system-with-apache-kafka-and-python [2]: https://kafka.apache.org/documentation.html#gettingStarted [3]: https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm [4]: https://cdn.scotch.io/15775/T1cgynmnTj2Vtz0Ih8Dv_producer.jpg [5]: https://cdn.scotch.io/15775/tZD4cMBqTamBmzoE3sXE_consumer.jpg [6]: https://cdn.scotch.io/15775/Bn9desdVTraaEn9azvDb_Screen%20Shot%202017-02-04%20at%207.38.50%20PM.png

Comments:

You must log in to comment.