實(shí)現(xiàn)之前一定要把JDK,Zookeeper和Kafka都配置好
需要先配置下Kafka
在kafka的config
目錄下找到server.properties
配置文件
把listeners
和 advertised.listeners
兩處配置的注釋去掉邮绿,可以根據(jù)需要配置連接的服務(wù)器外網(wǎng)IP
和端口號(hào)
,我這里演示選擇的是本地localhost
和默認(rèn)端口9092
Kafka與SpringBoot進(jìn)行整合
1.引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.編寫生產(chǎn)者和消費(fèi)者
@RestController
public class KafkaController {
private static Logger logger = LoggerFactory.getLogger(KafkaController.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/kafka")
public String testKafka() {
int iMax = 100;
for (int i = 1; i < iMax; i++) {
kafkaTemplate.send("test","key" + i, "data" + i);
}
return "success";
}
@KafkaListener(topics = "test")
public void receive(ConsumerRecord<?, ?> consumer) {
logger.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value());
}
}
相關(guān)代碼說明
KafkaTemplate
這個(gè)類包裝了個(gè)生產(chǎn)者Producer
烫映,來提供方便的發(fā)送數(shù)據(jù)到kafka
的主題topic
里面让簿。
send()
方法的源碼,KafkaTemplate
類中還重載了很多send()
方法,有需要可以看看源碼
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
通過KafkaTemplate
模板類發(fā)送數(shù)據(jù)币他。
kafkaTemplate.send(String topic, K key, V data)
,第一個(gè)入?yún)⑹侵黝}坞靶,第二個(gè)入?yún)⑹前l(fā)送的對(duì)象,第三個(gè)入?yún)⑹前l(fā)送的數(shù)據(jù)蝴悉。通過@KafkaListener
注解配置用戶監(jiān)聽topics
配置文件application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: kafka2
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers
:kafka服務(wù)器地址(可以多個(gè))
consumer.group-id
:指定一個(gè)默認(rèn)的組名
不指定的話會(huì)報(bào)
java.lang.IllegalStateException: No group.id found in consumer config,
container properties, or @KafkaListener annotation;
a group.id is required when group management is used.
auto-offset-reset
:自動(dòng)偏移量
1.earliest
:當(dāng)各分區(qū)下有已提交的offset
時(shí)彰阴,從提交的offset
開始消費(fèi);無提交的offset
時(shí)拍冠,從頭開始消費(fèi)
2.latest
:當(dāng)各分區(qū)下有已提交的offset
時(shí)尿这,從提交的offset
開始消費(fèi);無提交的offset
時(shí)倦微,消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
3.none
:topic
各分區(qū)都存在已提交的offset
時(shí)妻味,從offset
后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset
欣福,則拋出異常
這個(gè)屬性也是必須配置的责球,不然也是會(huì)報(bào)錯(cuò)的
org.apache.kafka.common.config.ConfigException:
Invalid value for configuration auto.offset.reset:
String must be one of: latest, earliest, none
消息序列化和反序列化
在使用Kafka發(fā)送接收消息時(shí),生產(chǎn)者producer
端需要序列化拓劝,消費(fèi)者consumer
端需要反序列化雏逾,由于網(wǎng)絡(luò)傳輸過來的是byte[]
,只有反序列化后才能得到生產(chǎn)者發(fā)送的真實(shí)的消息內(nèi)容郑临。這樣消息才能進(jìn)行網(wǎng)絡(luò)傳輸
consumer.key-deserializer
和consumer.value-deserializer
是消費(fèi)者key/value
反序列化
producer.key-deserializer
和producer.value-deserializer
是生產(chǎn)者key/value
序列化
StringDeserializer
是內(nèi)置的字符串反序列化方式
public class StringDeserializer implements Deserializer<String> {
public String deserialize(String topic, byte[] data) {
try {
//如果數(shù)據(jù)為空栖博,那么直接返回null即可,否則將byte[]反序列化,即轉(zhuǎn)為String即可
return data == null ? null : new String(data, this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
}
}
......
}
StringSerializer
是內(nèi)置的字符串序列化方式
public class StringSerializer implements Serializer<String> {
public byte[] serialize(String topic, String data) {
try {
//如果數(shù)據(jù)為空厢洞,那么直接返回null即可,否則將String序列化仇让,即轉(zhuǎn)為byte[]即可
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
......
}
在org.apache.kafka.common.serialization
源碼包中還提供了多種類型的序列化和反序列化方式
要自定義序列化方式,需要實(shí)現(xiàn)接口Serializer
要自定義反序列化方式躺翻,需要實(shí)現(xiàn)接口Deserializer
詳細(xì)可以參考
https://blog.csdn.net/shirukai/article/details/82152172
啟動(dòng)項(xiàng)目進(jìn)行測(cè)試
這是Kafka
的消費(fèi)者Consumer
的配置信息丧叽,每個(gè)消費(fèi)者都會(huì)輸出該配置信息
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka2
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2018-11-22 14:16:53.465 INFO 11980 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.0
2018-11-22 14:16:53.465 INFO 11980 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 3402a8361b734732
2018-11-22 14:16:57.664 INFO 11980 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: d3n7Snc2TFmSFcNsHjqgVw
訪問http://localhost:8080/kafka,就可以看到控制臺(tái)打印消息了