java Kafka 簡(jiǎn)單應(yīng)用實(shí)例

1、安裝zookeeper

下載zookeeper-3.4.9.tar;

解壓tar -zxvf zookeeper-3.4.9.tar冬阳;

進(jìn)入zookeeper-3.4.9/conf目錄創(chuàng)建zoo.cfg文件倦沧,內(nèi)容如下:

tickTime=2000

dataDir=/usr/myenv/zookeeper-3.4.8/data(填寫(xiě)自己的data目錄)

dataLogDir=/usr/myenv/zookeeper-3.4.8/logs

clientPort=2181

啟動(dòng)zookeeper:

./yourZookeeperDir/bin/zkServer.sh start

2、安裝kafka

下載kafka:http://kafka.apache.org/downloads

解壓kafka:tar -zxvf kafka_2.10-0.8.2.1.tar

修改config/server.propertie配置文件中zookeeper的host配置启昧,由于zookeeper是在本地啟動(dòng)所以不需要修改:


server.propertie配置

啟動(dòng)kafka
./yourKafkaDir/bin/kafka-server-start.sh /yourKafkaDir/config/server.properties

3、kafka java 應(yīng)用demo

kafka Producer

package kafkaTest;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;


public class KafkaProducer {
    private final Producer<String,String>producer;
    public final static String TOPIC = "TEST-TOPIC";
    public KafkaProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list","192.168.1.103:9092");
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks","-1");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    public void produce(){
        int messageNo = 1000;
        final int COUNT = 10000;
        while (messageNo < COUNT){
            String key = String.valueOf(messageNo);
            String data = "@@@@@hello kafka message"+key;
            producer.send(new KeyedMessage<String, String>(TOPIC,key,data));
            System.out.println(data);
            messageNo++;
        }
    }

    public static void main(String[] args) {
        new KafkaProducer().produce();
    }

}

kafka consumer

package kafkaTest;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


public class KafkaConsumer {
    private final ConsumerConnector consumer;
    public KafkaConsumer() {
        Properties props = new Properties();
        props.put("zookeeper.connect","127.0.0.1:2181");
        props.put("group.id","test-group");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    }
    public void consume(){
        Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
        topicCountMap.put(KafkaProducer.TOPIC,new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String,List<KafkaStream<String,String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String,String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
        ConsumerIterator<String,String> it = stream.iterator();
        while (it.hasNext()){
            System.out.println(it.next().message());
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末劈伴,一起剝皮案震驚了整個(gè)濱河市密末,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌跛璧,老刑警劉巖严里,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異追城,居然都是意外死亡刹碾,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)座柱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)迷帜,“玉大人,你說(shuō)我怎么就攤上這事色洞∠非拢” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵火诸,是天一觀的道長(zhǎng)锦针。 經(jīng)常有香客問(wèn)我,道長(zhǎng)置蜀,這世上最難降的妖魔是什么奈搜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮盯荤,結(jié)果婚禮上媚污,老公的妹妹穿的比我還像新娘。我一直安慰自己廷雅,他們只是感情好耗美,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著航缀,像睡著了一般商架。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上芥玉,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天蛇摸,我揣著相機(jī)與錄音,去河邊找鬼灿巧。 笑死赶袄,一個(gè)胖子當(dāng)著我的面吹牛揽涮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播饿肺,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蒋困,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了敬辣?” 一聲冷哼從身側(cè)響起雪标,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎溉跃,沒(méi)想到半個(gè)月后村刨,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡撰茎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年嵌牺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片龄糊。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡髓梅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出绎签,到底是詐尸還是另有隱情枯饿,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布诡必,位于F島的核電站奢方,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏爸舒。R本人自食惡果不足惜蟋字,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望扭勉。 院中可真熱鬧鹊奖,春花似錦、人聲如沸涂炎。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)唱捣。三九已至两蟀,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間震缭,已是汗流浹背赂毯。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人党涕。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓烦感,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親膛堤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子手趣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容