kafka初涉(docker安裝以及Spring boot簡單應用)

docker安裝

這里推薦使用docker-compose安裝

以下是docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: 
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: 172.31.245.238
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - ./kafka-log:/kafka
  • ALLOW_PLAINTEXT_LISTENER 允許使用PLAINTEXT偵聽器。
  • depends_on: -zookeeper:kafka依賴于zookeeper
  • KAFKA_ADVERTISED_LISTENERS 是指向Kafka代理的可用地址列表躏结。 Kafka將在初次連接時將它們發(fā)送給客戶却盘。格式為 PLAINTEXT://host:port ,此處已將容器9092端口映射到宿主機9092端口媳拴,0.0.0.0為監(jiān)聽所有地址(未驗證)
  • KAFKA_ADVERTISED_HOST_NAME: 172.31.245.238 :映射宿主機地址
  • KAFKA_CREATE_TOPICS: "test:1:1" 預創(chuàng)建主題test, 分區(qū)數(shù)黄橘,分區(qū)副本數(shù)
  • KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 映射zookeeper

步驟

(1)創(chuàng)建以上yaml文件

(2)在該文件目錄下,執(zhí)行

docker-compose build
docker-compose up -d

SpringBoot簡單應用實例

依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=PLAINTEXT://172.31.245.238:9092
###########【初始化生產(chǎn)者配置】###########
# 重試次數(shù)
spring.kafka.producer.retries=0
# 應答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(可選0屈溉、1塞关、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延時
spring.kafka.producer.properties.linger.ms=0
# 當生產(chǎn)端積累的消息達到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka
# linger.ms為0表示每接收到一條消息就提交給kafka,這時候batch-size其實就沒用了

# 生產(chǎn)端緩沖區(qū)大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定義分區(qū)器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
?
###########【初始化消費者配置】###########
# 默認的消費組ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延時(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 當kafka中沒有初始offset或offset超出范圍時將自動重置offset
# earliest:重置為分區(qū)中最小的offset;
# latest:重置為分區(qū)中最新的offset(消費分區(qū)中新產(chǎn)生的數(shù)據(jù));
# none:只要有一個分區(qū)不存在已提交的offset,就拋出異常;
spring.kafka.consumer.auto-offset-reset=latest
# 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費請求超時時間
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消費端監(jiān)聽的topic不存在時,項目啟動會報錯(關(guān)掉)
spring.kafka.listener.missing-topics-fatal=false
# 設(shè)置批量消費
# spring.kafka.listener.type=batch
# 批量消費每次最多消費多少條消息
# spring.kafka.consumer.max-poll-records=50

spring.kafka.bootstrap-servers=PLAINTEXT://172.31.245.238:9092子巾,其中172.31.245.238為docker-compose的KAFKA_ADVERTISED_HOST_NAME參數(shù)帆赢。

生產(chǎn)者:

@RestController
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 直接發(fā)送字符串
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("test", normalMessage);
    }

   // 將對象轉(zhuǎn)化為json字符串再發(fā)送
    @GetMapping("/kafka/sendTopic2")
    public void sendMessage2() {
        User user = User.builder()
                .name("yuanwei")
                .password("1234")
                .age(12)
                .build();
        try{
            String message= JacksonUtil.objToJson(user);
            kafkaTemplate.send("test", message);
        }catch (Exception e){
            log.error(e.toString());
        }
    }
}

消費者:

@Component
@Slf4j
public class KafkaConsumer {
    // 消費監(jiān)聽
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消費的哪個topic、partition的消息,打印出消息內(nèi)容
        System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }

    // 將消費到的json字符串轉(zhuǎn)化為對象线梗。
    @KafkaListener(topics = {"test"})
    public void onMessage2(ConsumerRecord<?, ?> record){
        // 消費的哪個topic椰于、partition的消息,打印出消息內(nèi)容
        System.out.println("對象1消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
        try{
            User user = (User)JacksonUtil.jsonToObj(new User(),(String)record.value());
            System.out.println("對象1消費:"+record.topic()+"-"+record.partition()+"-"+user);
        }catch (Exception e){
            log.error(e.toString());
        }
    }
}

json轉(zhuǎn)換工具類

public class JacksonUtil {
    /*
     * 001.json轉(zhuǎn)換成對象
     * @param:傳入對象,json字符串
     * @return:Object
     */
    public static Object jsonToObj(Object obj,String jsonStr) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper mapper = new ObjectMapper();
        return obj = mapper.readValue(jsonStr, obj.getClass());
    }
    /*
     * 002.對象轉(zhuǎn)換成json
     * @param:傳入對象
     * @return:json字符串
     */
    public static String objToJson(Object obj) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(obj);
    }
}

User對象

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private String name;

    private String password;

    private Integer age;
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末仪搔,一起剝皮案震驚了整個濱河市瘾婿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌烤咧,老刑警劉巖偏陪,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異煮嫌,居然都是意外死亡笛谦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門昌阿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來饥脑,“玉大人,你說我怎么就攤上這事宝泵『脝” “怎么了轩娶?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵儿奶,是天一觀的道長。 經(jīng)常有香客問我鳄抒,道長闯捎,這世上最難降的妖魔是什么椰弊? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮瓤鼻,結(jié)果婚禮上秉版,老公的妹妹穿的比我還像新娘。我一直安慰自己茬祷,他們只是感情好清焕,可當我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著祭犯,像睡著了一般秸妥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沃粗,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天粥惧,我揣著相機與錄音,去河邊找鬼最盅。 笑死突雪,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的涡贱。 我是一名探鬼主播咏删,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼盼产!你這毒婦竟也來了饵婆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤戏售,失蹤者是張志新(化名)和其女友劉穎侨核,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體灌灾,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡搓译,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了锋喜。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片些己。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖嘿般,靈堂內(nèi)的尸體忽然破棺而出段标,到底是詐尸還是另有隱情,我是刑警寧澤炉奴,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布冬念,位于F島的核電站霸妹,受9級特大地震影響菠劝,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜派任,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望璧南。 院中可真熱鬧掌逛,春花似錦、人聲如沸司倚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽动知。三九已至崖叫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拍柒,已是汗流浹背心傀。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拆讯,地道東北人脂男。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像种呐,于是被迫代替她去往敵國和親宰翅。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容