單元測(cè)試之embedded-kafka

在項(xiàng)目中显熏,團(tuán)隊(duì)也使用了Kafka作為消息中間件。 經(jīng)過(guò)了嵌入式redis選型的問(wèn)題之后晒屎,筆者在嵌入式kafka選型時(shí)就更傾向于還在持續(xù)更新喘蟆,并且維護(hù)人員是一個(gè)團(tuán)隊(duì)而不是個(gè)人或者松散的組織。 最終鼓鲁,筆者選擇了來(lái)自salesforce的開(kāi)源項(xiàng)目

<groupId>com.salesforce.kafka.test</groupId>
    <artifactId>kafka-junit</artifactId>
    <version>3.0.1</version>

以下是在項(xiàng)目自帶的測(cè)試用例代碼上稍加修改的案例蕴轨。

package com.salesforce.kafka.test.junit4;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.common.Node;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.kafka.test.KafkaBroker;
import com.salesforce.kafka.test.KafkaTestServer;

public class KafkaBrokerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTest.class);

    /**
     * Validate that we started 2 brokers.
     * @throws Exception 
     */
    @Test
    public void testTwoBrokersStarted() throws Exception {
        Properties overrideProperties = new Properties();
        overrideProperties.put("broker.id", "1");
        overrideProperties.put("port", "6666");
        KafkaTestServer server1= new KafkaTestServer(overrideProperties);
        server1.start();
       String string= server1.getKafkaBrokers().getBrokerById(1).getConnectString();
        logger.info("\n"+string+"\n");
         
        overrideProperties.put("broker.id", "2");
        overrideProperties.put("port", "8888");
        KafkaTestServer server2= new KafkaTestServer(overrideProperties);
        server2.start();
       String string2= server2.getKafkaBrokers().getBrokerById(2).getConnectString();
        logger.info("\n"+string2+"\n");

    }

}

問(wèn)題

從上述案例中可以看出,在實(shí)際的kafka使用中坐桩,IP+端口號(hào)是每個(gè)kafka broker都不一樣的尺棋。但是在salesforce/kafka-core中提供的KafkaTestCluster類(lèi)中封锉,其并沒(méi)有給外部來(lái)指定某個(gè)broker port的機(jī)會(huì)绵跷。

 /**
     * Starts the cluster.
     * @throws Exception on startup errors.
     * @throws TimeoutException When the cluster fails to start up within a timely manner.
     */
    public void start() throws Exception, TimeoutException {
        // Ensure zookeeper instance has been started.
        zkTestServer.start();

        // If we have no brokers defined yet...
        if (brokers.isEmpty()) {
            // Loop over brokers, starting with brokerId 1.
            for (int brokerId = 1; brokerId <= numberOfBrokers; brokerId++) {
                // Create properties for brokers
                final Properties brokerProperties = new Properties();

                // Add user defined properties.
                brokerProperties.putAll(overrideBrokerProperties);

                // Set broker.id
                brokerProperties.put("broker.id", String.valueOf(brokerId));

                // Create new KafkaTestServer and add to our broker list
                brokers.add(
                    new KafkaTestServer(brokerProperties, zkTestServer)
                );
            }
        }

        // Loop over each broker and start it
        for (final KafkaTestServer broker : brokers) {
            broker.start();
        }

        // Block until the cluster is 'up' or the timeout is exceeded.
        waitUntilClusterReady(10_000L);
    }

這在某些需要預(yù)先指定IP+端口號(hào)的場(chǎng)景中還是有一些麻煩的。需要后續(xù)進(jìn)行處理成福。例如碾局,給這個(gè)類(lèi)新增一個(gè)構(gòu)造方法,利用以下的List ,把已經(jīng)完成初始化的List<KafkaTestServer> brokers 作為入?yún)鬟f進(jìn)去奴艾。

private final List<KafkaTestServer> brokers = new ArrayList<>();

新增構(gòu)造方法:

public KafkaTestCluster(final List<KafkaTestServer> brokers) 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末净当,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蕴潦,更是在濱河造成了極大的恐慌像啼,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,464評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件潭苞,死亡現(xiàn)場(chǎng)離奇詭異忽冻,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)此疹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)僧诚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人蝗碎,你說(shuō)我怎么就攤上這事湖笨。” “怎么了蹦骑?”我有些...
    開(kāi)封第一講書(shū)人閱讀 169,078評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵慈省,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我眠菇,道長(zhǎng)边败,這世上最難降的妖魔是什么清钥? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,979評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮放闺,結(jié)果婚禮上祟昭,老公的妹妹穿的比我還像新娘。我一直安慰自己怖侦,他們只是感情好篡悟,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,001評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著匾寝,像睡著了一般搬葬。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上艳悔,一...
    開(kāi)封第一講書(shū)人閱讀 52,584評(píng)論 1 312
  • 那天急凰,我揣著相機(jī)與錄音,去河邊找鬼猜年。 笑死抡锈,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的乔外。 我是一名探鬼主播床三,決...
    沈念sama閱讀 41,085評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼杨幼!你這毒婦竟也來(lái)了撇簿?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 40,023評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤差购,失蹤者是張志新(化名)和其女友劉穎四瘫,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體欲逃,經(jīng)...
    沈念sama閱讀 46,555評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡找蜜,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,626評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了暖夭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锹杈。...
    茶點(diǎn)故事閱讀 40,769評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖迈着,靈堂內(nèi)的尸體忽然破棺而出竭望,到底是詐尸還是另有隱情,我是刑警寧澤裕菠,帶...
    沈念sama閱讀 36,439評(píng)論 5 351
  • 正文 年R本政府宣布咬清,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏旧烧。R本人自食惡果不足惜影钉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,115評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望掘剪。 院中可真熱鬧平委,春花似錦、人聲如沸夺谁。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,601評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)匾鸥。三九已至蜡塌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間勿负,已是汗流浹背馏艾。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,702評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留奴愉,地道東北人琅摩。 一個(gè)月前我還...
    沈念sama閱讀 49,191評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像躁劣,于是被迫代替她去往敵國(guó)和親迫吐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子库菲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,781評(píng)論 2 361