vertx連接kafka

vertx的官方自我介紹

Eclipse Vert.x is a tool-kit for building reactive applications on the JVM.

也就是說vertx是一個(gè)JVM上面的響應(yīng)式工具集肆饶。

之所以選擇vertx主要是考慮到springboot那一套過于笨重了纱意,另外在響應(yīng)式的支持上也不是很完善凯旭。

引入依賴

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-kafka-client</artifactId>
            <version>3.8.3</version>
            <exclusions><!--去掉log4j的依賴,改為logback -->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

                <!--lombok 一個(gè)簡化開發(fā)的利器-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

        <!-- 反射工具包 -->
        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.11</version>
        </dependency>


        <!-- 導(dǎo)入slf4j的接口包以及對應(yīng)日志框架的驅(qū)動(dòng)包 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </dependency>
    </dependencies>

消費(fèi)者

import java.util.HashMap;
import java.util.Map;

import io.vertx.core.Vertx;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class KafkaListener {

    public static final KafkaListener instance = new KafkaListener();

    KafkaConsumer<String, String> consumer;

    private KafkaListener() {
        Map<String, String> config = new HashMap<>();
        config.put("bootstrap.servers", "localhost:9092");
        config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("group.id", "my_group");
        config.put("auto.offset.reset", "earliest");
        config.put("enable.auto.commit", "false");

        Vertx vertx = Vertx.vertx();
        // 創(chuàng)建一個(gè)Kafka Consumer
        consumer = KafkaConsumer.create(vertx, config);
    }

    public void listen(String topic) {
        consumer.subscribe(topic).handler(r -> {
            log.info("收到消息");
        });
    }
}

生產(chǎn)者


import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.impl.KafkaProducerRecordImpl;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class KafkaSender {

    static KafkaProducer<String, String> producer;

    static {
        Vertx vertx = Vertx.vertx();
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        // 注意這里的第三和第四個(gè)參數(shù)
        producer = KafkaProducer.create(vertx, config, String.class, String.class);
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void send(String topic, String message) {
        KafkaProducerRecord<String, String> record = new KafkaProducerRecordImpl(topic, message);
        producer.send(record, f -> {
            log.info("kafka主題{}消息{}發(fā)送{}", topic, message, f.succeeded() ? "成功" : "失敗");
        });
    }
}
···
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末酒唉,一起剝皮案震驚了整個(gè)濱河市宇整,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖搬泥,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異伏尼,居然都是意外死亡忿檩,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門爆阶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來燥透,“玉大人沙咏,你說我怎么就攤上這事“嗵祝” “怎么了肢藐?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長吱韭。 經(jīng)常有香客問我吆豹,道長,這世上最難降的妖魔是什么理盆? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任痘煤,我火速辦了婚禮,結(jié)果婚禮上猿规,老公的妹妹穿的比我還像新娘衷快。我一直安慰自己,他們只是感情好姨俩,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布蘸拔。 她就那樣靜靜地躺著,像睡著了一般环葵。 火紅的嫁衣襯著肌膚如雪调窍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天积担,我揣著相機(jī)與錄音陨晶,去河邊找鬼。 笑死帝璧,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的湿刽。 我是一名探鬼主播的烁,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诈闺!你這毒婦竟也來了渴庆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雅镊,失蹤者是張志新(化名)和其女友劉穎襟雷,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仁烹,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡耸弄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了卓缰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片计呈。...
    茶點(diǎn)故事閱讀 39,977評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡砰诵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出捌显,到底是詐尸還是另有隱情茁彭,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布扶歪,位于F島的核電站理肺,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏善镰。R本人自食惡果不足惜哲嘲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望媳禁。 院中可真熱鬧眠副,春花似錦、人聲如沸竣稽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽毫别。三九已至娃弓,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間岛宦,已是汗流浹背台丛。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留砾肺,地道東北人挽霉。 一個(gè)月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓篷扩,卻偏偏與公主長得像巴柿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子扶踊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容