在搭建數(shù)據(jù)通道(data pipeline)時(shí)慎陵,由于涉及到:數(shù)據(jù)讀取眼虱,數(shù)據(jù)分析,數(shù)據(jù)存儲(chǔ)等等席纽,
如果將各個(gè)部分分別容器化捏悬,獨(dú)立設(shè)計(jì)各個(gè)模塊,將有助于縮短開發(fā)時(shí)間润梯。
這里以一個(gè)基于 Kafka 和 Spark Streaming 的實(shí)時(shí)流 data pipeline 為例过牙,介紹如何使用 docker compose 分別搭建各個(gè)服務(wù)并實(shí)現(xiàn)快速demo。
整體架構(gòu)
使用 Apache Kafka 作為數(shù)據(jù)總線纺铭, 進(jìn)行數(shù)據(jù)的收集與分發(fā)寇钉。通過(guò) Spark Streaming 實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)處理,并將結(jié)果數(shù)據(jù)存儲(chǔ)到 MySQL 中舶赔。
具體結(jié)構(gòu)如圖:
數(shù)據(jù)讀取
實(shí)例中需要分析來(lái)自 csv
文件的數(shù)據(jù)摧莽。為了將文件數(shù)據(jù)導(dǎo)入 Kafka,可以使用 Kafka Connect顿痪。
不同文件中的數(shù)據(jù)將被 Kafka Connect 導(dǎo)入 Kafka 中專門的 topic镊辕。
數(shù)據(jù)分析
Spark Streaming 可以方便地接收來(lái)自 Kafka 的實(shí)時(shí)數(shù)據(jù)。
這個(gè)Demo中使用的數(shù)據(jù)源為來(lái)自文件的批數(shù)據(jù)蚁袭,不過(guò)Demo中的架構(gòu)同樣可以處理流數(shù)據(jù)征懈。
數(shù)據(jù)存儲(chǔ)
經(jīng)過(guò) Spark Streaming 分析得到的結(jié)果,將被導(dǎo)入 MySQL揩悄,方便之后的查詢卖哎。同時(shí),也可以導(dǎo)入 Kafka 中對(duì)應(yīng)的 topic。例如異常分析的結(jié)果可以放入名為anomaly
的 topic亏娜。
docker compose 管理
使用 docker compose 來(lái)管理上述眾多服務(wù)焕窝。 由于主要的處理邏輯都放在了 Spark Streaming 中,需要自行編寫Dockerfile维贺,其它各個(gè)服務(wù)都可以直接使用來(lái)自 Docker Hub的鏡像它掂。
ZooKeeper服務(wù):
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
container_name: pipeline-zookeeper
Kafka服務(wù):
使用來(lái)自 confluent 的 kafka 鏡像。
僅用作Demo測(cè)試架構(gòu)可行性溯泣,因此 kafka 僅含一個(gè) broker虐秋, topic 的 partition 數(shù)以及 replication factor 也因此為1.
kafka1:
image: confluentinc/cp-kafka:4.0.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zoo1
container_name: pipeline-kafka
Kafka Connect服務(wù):
Kafka Connect 需要知道 Kafka 集群的信息。在 docker compose 內(nèi)可以用服務(wù)名稱即 kafka1 直接指代我們之前配置的 kafka broker垃沦。
這里配置的僅僅是 Kafka Connect Worker客给,具體的connect任務(wù),即連接什么數(shù)據(jù)源肢簿,連接到kafka中的哪個(gè)topic去靶剑,需要在Kafka Connect服務(wù)啟動(dòng)后,通過(guò) REST API 提交池充。
kafka-connect:
image: confluentinc/cp-kafka-connect:4.0.0
hostname: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group1
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
volumes:
- ./data:/data
depends_on:
- zoo1
- kafka1
container_name: pipeline-kafka-connect
Spark Streaming服務(wù):
Spark Streaming 服務(wù)build自本地路徑./spark-streaming
桩引。
路徑下放有Dockerfile
,requirements.txt
纵菌,以及存放處理邏輯的main
文件夾阐污。
spark-streaming:
build: ./spark-streaming
ports:
- "8081:8081"
volumes:
- ./log:/spark-streaming/log
depends_on:
- kafka1
container_name: pipeline-spark-streaming
MySQL服務(wù):
在environment
下設(shè)置數(shù)據(jù)庫(kù)名稱與密碼休涤。
同時(shí)可以在command
下聲明初始化文件咱圆,創(chuàng)建項(xiàng)目中需要的表格。
mysql:
image: mysql:5.6.34
restart: always
environment:
MYSQL_DATABASE: "data_pipeline"
MYSQL_ROOT_PASSWORD: "233"
MYSQL_ALLOW_EMPTY_PASSWORD: "no"
command: --init-file /tmp/create_db.sql
volumes:
- ./mysql/create_db.sql:/tmp/create_db.sql
ports:
- "3306:3306"
container_name: pipeline-mysql