Apache Kafka起源于LinkedIn苟蹈,后來于2011年成為開源Apache項目,然后于2012年成為First-class Apache項目胳蛮。Kafka是用Scala和Java編寫的销凑。 Apache Kafka是基于發(fā)布訂閱的容錯消息系統。 它是快速仅炊,可擴展和設計分布斗幼。
kafka介紹
主要功能
根據官網的介紹,ApacheKafka?是一個分布式流媒體平臺抚垄,它主要有3種功能:
發(fā)布和訂閱消息流蜕窿,這個功能類似于消息隊列谋逻,這也是kafka歸類為消息隊列框架的原因
以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流
可以再消息發(fā)布的時候進行處理
使用場景
- 在系統或應用程序之間構建可靠的用于傳輸實時數據的管道桐经,消息隊列功能
- 構建實時的流數據處理程序來變換或處理數據流毁兆,數據處理功能
基本概念
Producer
: 消息和數據的生產者,向kafka的一個Topic發(fā)布消息的進程/代碼/服務
Consumer
:消息和數據的消費者阴挣,訂閱數據(topic)并且處理其發(fā)布的消息的進程/代碼/服務
Consumer Group
:邏輯概念气堕,對于同一個topic,會廣播給不同的group畔咧,一個group中茎芭,只有一個consumer可以消費該消息
Broker
:物理概念,kafka集群中的每個kafka節(jié)點
Topic
:邏輯概念誓沸,kafka消息的類別梅桩,對數據進行區(qū)分、隔離
Partition
:物理概念拜隧,kafka下數據存儲的基本單元宿百。一個topic數據,會被分散存儲到多個Partition虹蓄,每一個Partition是有序的
Replication
:同一個Partition可能會多個Replica犀呼,多個Replica之間數據是一樣的
Replication Leader
:一個Partition的多個Replica上,需要一個Leader負責該Partition上與Producer和Consumer交互
ReplicaManager
:負責管理當前broker所有分區(qū)和副本的信息薇组,處理KafkaController發(fā)起的一些請求,副本狀態(tài)的切換坐儿、添加/讀取消息等
基本概念延伸
Partition
- 每一個Topic被切分為多個Partitions
- 消費者數目少于或等于Partition的數目
- Broker Group中的每一個Broker保存Topic的一個或多個Partitions
- Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partitions律胀,并且是唯一的Consumer
Replication
- 當集群中有Broker掛掉的情況,系統可以主動地使Replicas提供服務
- 系統默認設置每一個Topic的Replication系數為1貌矿,可以在創(chuàng)建Topic時單獨設置
Replication特點
- Replication的基本單位是Topic的Partition
- 所有的讀和寫都從Leader進炭菌,Followers只是作為備份
- Follower必須能夠及時復制Leader的數據
- 增加容錯性和可擴展性
kafka基本結構
消息傳輸流程
- Producers API
- Consumers API
- Streams API
-
Connectors API
下圖更好的展示了kafka消息隊列模式的運作
kafka的消息結構
kafka特點
分布式
- 多分區(qū)
- 多副本
- 多訂閱者
- 基于ZooKeeper調度
高性能
- 高吞吐量
- 低延遲
- 高并發(fā)
- 時間復雜度為O(1)
持久性和擴展性
- 數據可持久化
- 容錯性
- 支持水平在線擴展
- 消息自動平衡
應用場景
- 消息隊列
- 行為跟蹤
- 元信息監(jiān)控
- 日志收集
- 流處理
- 事件源
- 持久性日志(commit log)
- 等
kafka簡單案例
下載與安裝
ZooKeeper下載
http://zookeeper.apache.org/releases.html#download
Kafka下載
http://kafka.apache.org/downloads
安裝
解壓、配置環(huán)境變量
PS:如果是Mac的話逛漫,有一個便捷安裝方法黑低,brew install kafka
啟動
運行Zookeeper
運行cmd命令窗口,輸入zkServer回車酌毡,出現下圖的就表示zookeeper啟動成功克握,也表明安裝成功了。
啟動kafka
在新的cmd命令行用cd命令切換到kafka根目錄..\kafka_2.11-2.2.0
枷踏,輸入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出現started (kafka.server.KafkaServer)字樣表示啟動成功
創(chuàng)建一個Topic
運行新的cmd命令行菩暗,進入..\kafka_2.11-2.2.0\bin\windows
,創(chuàng)建主題:test
旭蠕,輸入命令
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
創(chuàng)建一個Producer
運行新的cmd命令行停团,進入..\kafka_2.11-2.2.0\bin\windows
旷坦,輸入命令
kafka-console-producer.bat --broker-list localhost:9092 --topic test
創(chuàng)建一個Consumer
運行新的cmd命令行,進入..\kafka_2.11-2.2.0\bin\windows
佑稠,輸入命令
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
測試
在Producer窗口下輸入信息進行測試 秒梅,輸入的消息立馬就會出現在Consumer中,表明kafka已經安裝測試成功
kafka代碼案例
項目結構
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.cxy</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>kafka_learning</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
kafka.consumer.zookeeper.connect=127.0.0.1:2181
kafka.consumer.servers=127.0.0.1:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=kafkaTestTopic
kafka.consumer.group.id=kafkaTest
kafka.consumer.concurrency=10
kafka.producer.servers=127.0.0.1:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
kafka.topic.default=kafkaTestTopic
log4j.properties
log4j.rootLogger=DEBUG, stdout
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
ErrorCode.java
package com.cxy.kafka.common;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
public class ErrorCode {
public final static int SUCCESS = 200;
public final static int EXCEPTION = 500;
}
MessageEntity.java
package com.cxy.kafka.common;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Getter
@Setter
@EqualsAndHashCode
public class MessageEntity {
private String title;
private String body;
@Override
public String toString(){
return "MessageEntity{" +
"title='" + title + '\'' +
",body='" + body + '\'' +
'}';
}
}
Response.java
package com.cxy.kafka.common;
import lombok.Getter;
import lombok.Setter;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Getter
@Setter
public class Response {
private int code;
private String message;
public Response(int code, String message){
this.code = code;
this.message = message;
}
}
KafkaProducerConfig.java
package com.cxy.kafka.config;
import com.cxy.kafka.common.MessageEntity;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String,Object> producerConfigs() {
Map<String,Object> props = new HashMap<String, Object>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, MessageEntity> producerFactory(){
return new DefaultKafkaProducerFactory<String, MessageEntity>(
producerConfigs(),
new StringSerializer(),
new JsonSerializer<MessageEntity>());
}
@Bean
public KafkaTemplate<String, MessageEntity> kafkaTemplate(){
return new KafkaTemplate<String, MessageEntity>(producerFactory());
}
}
KafkaConsumerConfig.java
package com.cxy.kafka.config;
import com.cxy.kafka.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
//服務是否自動提交
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
//session超時時間
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
//提交的間隔
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
//生產者groupId
@Value("${kafka.consumer.group.id}")
private String groupId;
//自動將offset重置到某位置
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
//并發(fā)數目
@Value("${kafka.consumer.concurrency}")
private int concurrency;
private Map<String,Object> consumerConfigs() {
Map<String,Object> props = new HashMap<String, Object>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
private ConsumerFactory<String, MessageEntity> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, MessageEntity>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<MessageEntity>(MessageEntity.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, MessageEntity>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
}
ProducerCallback.java
package com.cxy.kafka.producer;
import com.cxy.kafka.common.MessageEntity;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Slf4j
public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> {
public static final Logger logger = Logger.getLogger(ProducerCallback.class);
private final long startTime;
private final String key;
private final MessageEntity messageEntity;
private final Gson gson = new Gson();
public ProducerCallback(long startTime, String key, MessageEntity messageEntity){
this.startTime = startTime;
this.key = key;
this.messageEntity = messageEntity;
}
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onSuccess(@Nullable SendResult<String, MessageEntity> stringMessageEntitySendResult) {
if(stringMessageEntitySendResult == null){
return;
}
long elapsedTime = System.currentTimeMillis() - startTime;
RecordMetadata metadata = stringMessageEntitySendResult.getRecordMetadata();
if(metadata!=null){
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("message(")
.append("key= ").append(key).append(",")
.append("message= ").append(gson.toJson(messageEntity)).append(")")
.append("sent to partition(").append(metadata.partition()).append(")")
.append("with offset(").append(metadata.offset()).append(")")
.append("in ").append(elapsedTime).append(" ms");
logger.info(stringBuilder.toString());
}
}
}
SimpleProducer.java
package com.cxy.kafka.producer;
import com.cxy.kafka.common.MessageEntity;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Component
public class SimpleProducer {
@Autowired
@Qualifier("kafkaTemplate")
private KafkaTemplate<String, MessageEntity> kafkaTemplate;
public void send(String topic, MessageEntity messageEntity){
kafkaTemplate.send(topic,messageEntity);
}
public void send(String topic, String key, MessageEntity messageEntity){
ProducerRecord<String, MessageEntity> record = new ProducerRecord<String, MessageEntity>(topic,key,messageEntity);
long startTime = System.currentTimeMillis();
ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
future.addCallback(new ProducerCallback(startTime,key,messageEntity));
}
}
SimpleConsumer.java
package com.cxy.kafka.consumer;
import com.cxy.kafka.common.MessageEntity;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Slf4j
@Component
public class SimpleConsumer {
public static final Logger logger = Logger.getLogger(SimpleConsumer.class);
private final Gson gson = new Gson();
@KafkaListener(topics = "${kafka.topic.default}",containerFactory = "kafkaListenerContainerFactory")
public void receive(MessageEntity messageEntity){
logger.info(gson.toJson(messageEntity));
}
}
ProducerController.java
package com.cxy.kafka.controller;
import com.cxy.kafka.common.ErrorCode;
import com.cxy.kafka.common.MessageEntity;
import com.cxy.kafka.common.Response;
import com.cxy.kafka.producer.SimpleProducer;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Auther: cxy
* @Date: 2019/4/9
* @Description:
*/
@Slf4j
@RestController
@RequestMapping("/kafka")
public class ProducerController {
public static final Logger logger = Logger.getLogger(ProducerController.class);
@Autowired
private SimpleProducer simpleProducer;
@Value("${kafka.topic.default}")
private String topic;
private Gson gson = new Gson();
@RequestMapping(value="/hello",method = RequestMethod.GET, produces = {"application/json"})
public Response sendKafka(){
return new Response(ErrorCode.SUCCESS,"SUCCESS");
}
@RequestMapping(value="/send",method = RequestMethod.POST, produces = {"application/json"})
public Response sendKafka(@RequestBody MessageEntity messageEntity){
try{
logger.info("kafka消息:{}"+gson.toJson(messageEntity));
simpleProducer.send(topic,"key",messageEntity);
logger.info("發(fā)送kafka成功舌胶!");
return new Response(ErrorCode.SUCCESS,"發(fā)送kafka成功番电!");
}catch(Exception e){
logger.error("發(fā)送kafka失敗辆琅!",e);
return new Response(ErrorCode.EXCEPTION,"發(fā)送kafka失斒臁!");
}
}
}
啟動
啟動KafkaApplication.java
婉烟。(必須打開zookeeper和kafka)
測試
我這邊用的是Restlet Client
插件娩井,你們也可以根據自己喜好選擇測試工具
- 先測試下
http://localhost:8080/kafka/hello
- 測試
http://localhost:8080/kafka/send
發(fā)送kafka消息成功,去看下控制臺似袁,成功接收到kafka消息
源代碼
Kafka高級特性——消息事物
為什么要支持事物
- 滿足“讀取-處理-寫入”模式
- 流處理需求不斷增強
- 不準確的數據處理的容忍度
數據傳輸的事物定義
- 最多一次:消息不會被重復發(fā)送洞辣,最多被傳輸一次,但也有可能一次都不傳輸
- 最少一次:消息不會被漏發(fā)送昙衅,最少被傳輸一次扬霜,但也有可能被重復傳輸
- 精確的一次(Exactly once):不會被漏發(fā)送也不會被重復發(fā)送,每個消息都被傳輸一次而且僅僅被傳輸一次而涉,這是大家所期望的
事物保證
- 內部重試問題:Procedure冪等處理
-
多分區(qū)原子寫入
事物保證-避免僵尸實例
- 每個事物Producer分配一個transactional.id著瓶,在進程重新啟動時能識別相同的Producer實例
- Kafka增加了一個與transactional.id相關的epoch,存儲每個transactional.id元數據
- 一旦epoch被觸發(fā)啼县,任何具有相同transactional.id和更舊的epoch的Producer被視為僵尸材原,Kafka會拒絕來自這些Procedure的后續(xù)事務性寫入
Kafka高級特性——零拷貝
零拷貝簡介
- 網絡傳輸持久性日志塊
- Java Nio channel.transforTo()方法
- Linux sendfile系統調用
文件傳輸到網絡的公共數據路徑
- 操作系統將數據從磁盤讀入到內核空間的頁緩存
- 應用程序將數據從內核空間讀入到用戶空間緩存中
- 應用程序將數據寫回到內核空間到socket緩存中
- 操作系統將數據從socket緩存中復制到網卡緩沖區(qū),以便將數據經網絡發(fā)出
零拷貝路徑
- 操作系統將數據從磁盤讀入到內核空間的頁緩存
- 將數據的位置和長度的信息的描述符增加至內核空間(socket緩沖區(qū))
-
操作系統將數據從內核復制到網卡緩沖區(qū)季眷,以便將數據經網絡發(fā)出
零拷貝是指內核空間和用戶空間的交互的拷貝次數為零