軟件準(zhǔn)備
- Confluent安裝包
下載地址:https://www.confluent.io/download/
Confluent有兩個類型可以下載,企業(yè)版(Enterprise)需要付費悦穿,可以免費使用30天告抄,我這里使用的是開源版(Open Source)版钝诚,版本號是4.1.1
1. Confluent 介紹
(1) Confluent 是什么隘截?
Confluent平臺是一個可靠的铲咨,高性能的流處理平臺固歪,你可以通過這個平臺組織和管理各式各樣的數(shù)據(jù)源中的數(shù)據(jù)。
image.png
(2) Confluent 中有什么胯努?
- Confluent開源版
- Confluent Kafka Connectors
- Kafka Connect JDBC Connector
- Kafka Connect HDFS Connector
- Kafka Connect Elasticsearch Connector
- Kafka Connect S3 Connector
- Confluent Kafka Clients
- C/C++ Client Library
- Python Client Library
- Go Client Library
- .Net Client Library
- Confluent Schema Registry
- Confluent Kafka REST Proxy
- Confluent Kafka Connectors
- Confluent 企業(yè)版中增加的功能
- Automatic Data Balancing
- Multi-Datacenter Replication
- Confluent Control Center
- JMS Client
2. Confluent 開源版安裝
(1) 解壓安裝包牢裳,可以看到以下目錄:
[root@confluent confluent-4.1.1]# ll
total 24
drwxr-xr-x 3 1000 1000 4096 May 12 08:01 bin
drwxr-xr-x 14 1000 1000 4096 May 12 07:05 etc
drwxr-xr-x 3 1000 1000 4096 May 12 06:47 lib
-rw-r--r-- 1 1000 1000 871 May 12 08:02 README
drwxr-xr-x 6 1000 1000 4096 May 12 07:05 share
drwxr-xr-x 2 1000 1000 4096 May 12 08:02 src
(2) 啟動confluent
[root@confluent confluent-4.1.1]# bin/confluent start
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
confluent start 會啟動 confluent 全部組件,如果想要單獨啟動叶沛,比如單獨啟動 schema-registry蒲讯,可以執(zhí)行以下命令:
schema-registry-start
具體的單獨啟動各組件的命令,進(jìn)入 bin 目錄下灰署,一看就能明白判帮,不再贅述。
3. 簡單使用
(1) 創(chuàng)建 topic
[root@confluent confluent-4.1.1]# bin/kafka-topics \
> --create \
> --zookeeper localhost:2181 \
> --replication-factor 1 \
> --partitions 1 \
> --topic confluent-test-001
Created topic "confluent-test-001".
說明:
confluent 中內(nèi)嵌了 Kafka 和 Zookeeper溉箕,你也可以通過指定不同的 zookeeper 在其他的 kafka 集群中創(chuàng)建 topic 或執(zhí)行其他操作晦墙。
(2) 生產(chǎn)數(shù)據(jù)
[root@confluent confluent-4.1.1]# bin/ksql-datagen \
> quickstart=users \
> format=json \
> topic=confluent-test-001 \
> maxInterval=1000
[2018-06-22 14:53:19,170] INFO AvroDataConfig values:
schemas.cache.config = 1
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:179)
User_5 --> ([ 1513083004885 | 'User_5' | 'Region_9' | 'OTHER' ])
User_8 --> ([ 1508770926089 | 'User_8' | 'Region_3' | 'OTHER' ])
User_9 --> ([ 1504006562725 | 'User_9' | 'Region_5' | 'FEMALE' ])
User_8 --> ([ 1490524175099 | 'User_8' | 'Region_2' | 'OTHER' ])
User_8 --> ([ 1489424770134 | 'User_8' | 'Region_8' | 'MALE' ])
User_1 --> ([ 1516449943408 | 'User_1' | 'Region_4' | 'OTHER' ])
......
以上命令是內(nèi)嵌的一個kafka-producer腳本,生成隨機(jī)的用戶信息肴茄,可以通過 quickstart=[CLICKSTREAM_CODES, CLICKSTREAM, CLICKSTREAM_USERS, ORDERS, RATINGS, USERS, USERS_, PAGEVIEWS] 來生成不同的數(shù)據(jù)晌畅,這個腳本會運行很長時間(官網(wǎng)只說了很長時間,到底多長寡痰,沒說)抗楔,除非你手動停止
(3) 使用 KSQL 查詢生產(chǎn)的數(shù)據(jù)
在另一個窗口中棋凳,進(jìn)入KSQL命令行(上一個窗口繼續(xù)發(fā)數(shù)據(jù)不要停)
[root@confluent confluent-4.1.1]# bin/ksql
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka? =
===========================================
Copyright 2017 Confluent Inc.
CLI v4.1.1, Server v4.1.1 located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
把生產(chǎn)過來的數(shù)據(jù)創(chuàng)建為user表:
ksql> CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, \
> userid VARCHAR, \interests array<VARCHAR>, contact_info map<VARCHAR, VARCHAR>) \
> WITH (KAFKA_TOPIC='confluent-test-001', VALUE_FORMAT='JSON', KEY = 'userid');
Message
---------------
Table created
---------------
設(shè)置消費偏移量為 "earliest":
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
查詢:
ksql> select * from users;
1529651156298 | User_7 | 1497590434653 | OTHER | Region_7 | User_7 | null | null
1529651158082 | User_9 | 1508375625042 | OTHER | Region_1 | User_9 | null | null
1529651160496 | User_5 | 1501045879443 | MALE | Region_6 | User_5 | null | null
1529651161870 | User_6 | 1514541057484 | FEMALE | Region_5 | User_6 | null | null
1529651162248 | User_3 | 1498247501220 | MALE | Region_1 | User_3 | null | null
1529651162727 | User_1 | 1495368101769 | FEMALE | Region_3 | User_1 | null | null
1529651164048 | User_4 | 1508110530233 | MALE | Region_6 | User_4 | null | null
.....
# 只要生產(chǎn)數(shù)據(jù)的程序沒有停止,這里會一直打印查詢結(jié)果
4. 關(guān)閉服務(wù)
[root@confluent confluent-4.1.1]# bin/confluent stop
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]