1够滑、Docker安裝Kafka
打開(kāi)Cmd命令行
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
創(chuàng)建docker-compose.yml文件
version: '1'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 本機(jī)IP
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- 本地路徑(如D:\)docker.sock:/var/run/docker.sock
在docker-compose.yml文件目錄進(jìn)行服務(wù)打包
docker-compose build
啟動(dòng)服務(wù)
docker-compose up -d
創(chuàng)建兩個(gè)Topic為后面的程序使用
kafka-topics.sh --zookeeper 本機(jī)IP:2181 --create --replication-factor 1 --partitions 3 --topic first // 生產(chǎn)者使用
kafka-topics.sh --zookeeper 本機(jī)IP:2181 --create --replication-factor 1 --partitions 3 --topic second // 消費(fèi)者使用
2囊颅、用Java獲取Emq(見(jiàn)之前的文檔)的數(shù)據(jù)虎眨,并由生產(chǎn)者發(fā)出
pom.xml(同消費(fèi)者)
<dependencies>
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.22</version>
</dependency>
</dependencies>
MqttKafkaProducer.java
package cc.hiver.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
* MQTT生產(chǎn)者
*/
public class MqttKafkaProducer {
/**
* 向Kafka傳入數(shù)據(jù)
* @param msgData
*/
public static void pushData(String msgData) {
Properties props = new Properties();
// 集群地址叮姑,多個(gè)服務(wù)器用","分隔
props.put("bootstrap.servers", "本機(jī)IP:9092");
// 重新發(fā)送消息次數(shù)砾脑,到達(dá)次數(shù)返回錯(cuò)誤
props.put("retries", 0);
// Producer會(huì)嘗試去把發(fā)往同一個(gè)Partition的多個(gè)Requests進(jìn)行合并幼驶,batch.size指明了一次Batch合并后Requests總大小的上限。如果這個(gè)值設(shè)置的太小韧衣,可能會(huì)導(dǎo)致所有的Request都不進(jìn)行Batch盅藻。
props.put("batch.size", 163840);
// Producer默認(rèn)會(huì)把兩次發(fā)送時(shí)間間隔內(nèi)收集到的所有Requests進(jìn)行一次聚合然后再發(fā)送,以此提高吞吐量畅铭,而linger.ms則更進(jìn)一步氏淑,這個(gè)參數(shù)為每次發(fā)送增加一些delay,以此來(lái)聚合更多的Message硕噩。
props.put("linger.ms", 1);
// 在Producer端用來(lái)存放尚未發(fā)送出去的Message的緩沖區(qū)大小
props.put("buffer.memory", 33554432);
// key假残、value的序列化,此處以字符串為例炉擅,使用kafka已有的序列化類(lèi)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "com.kafka.demo.Partitioner");//分區(qū)操作辉懒,此處未寫(xiě)
props.put("acks", "1");
props.put("request.timeout.ms", "60000");
props.put("compression.type", "lz4");
//創(chuàng)建生產(chǎn)者
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//通過(guò)時(shí)間做輪循,均勻分布設(shè)置的partition坑资,提高效率耗帕。
int partition = (int) (System.currentTimeMillis() % 3);
//寫(xiě)入名為"test-partition-1"的topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first", partition, UUID.randomUUID().toString(), msgData);
try {
producer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("寫(xiě)入emqtopic到first:" + msgData);
}
}
OnMessageCallback.java
package cc.hiver.producer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 消息回調(diào)函數(shù)
*/
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 連接丟失后穆端,一般在這里面進(jìn)行重連
System.out.println("連接斷開(kāi)袱贮,可以做重連");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會(huì)執(zhí)行到這里面
System.out.println("接收消息主題:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息內(nèi)容:" + new String(message.getPayload()));
//接收到的消息發(fā)送到Kafka
MqttKafkaProducer.pushData(new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
ProducerApp.java
package cc.hiver.producer;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 生產(chǎn)者App
*/
public class ProducerApp {
/**
* 主程序
*
* @param args
*/
public static void main(String[] args) {
// 訂閱的主題
String subTopic = "testtopic/#";
// Broker服務(wù)
String broker = "tcp://本機(jī)IP:1883";
// 客戶(hù)端名稱(chēng)
String clientId = "mqtt_java_hiver";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 連接選項(xiàng)
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("admin"); // 用戶(hù)名
connOpts.setPassword("public".toCharArray()); // 密碼
// 保留會(huì)話
connOpts.setCleanSession(true);
// 設(shè)置回調(diào)
client.setCallback(new OnMessageCallback());
// 建立連接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
// 訂閱
client.subscribe(subTopic);
} catch (MqttException me) {
// 異常捕捉
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
3、消費(fèi)者
LogProcessor.java
package cc.hiver.consumer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* 日志清理
*/
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
}
public void process(byte[] key, byte[] value) {
String input = new String(value);
if(input.contains("hello")) {
System.out.println("logProcessor:" + input.toString());
context.forward("logProcessor".getBytes(), input.getBytes());
} else {
// 這里可以進(jìn)行數(shù)據(jù)清理
// 輸出到下一個(gè)topic
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
public void punctuate(long timestamp) {
}
public void close() {
}
}
ConsumerApp.java
package cc.hiver.consumer;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public class ConsumerApp {
public static void main(String[] args) {
// 定義輸入的topic
String from = "first";
// 定義輸出的topic
String to = "second";
// 設(shè)置參數(shù)
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "本機(jī)IP:9092");
StreamsConfig config = new StreamsConfig(settings);
// 構(gòu)建拓?fù)? Topology builder = new Topology();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具體分析處理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 創(chuàng)建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
4体啰、測(cè)試數(shù)據(jù)
附錄:kafka集群管理界面
docker run -itd --name=kafka-manager -p 9000:9000 -e ZK_HOSTS="本機(jī)IP:2181" sheepkiller/kafka-manager
訪問(wèn):http://本機(jī)IP:9000