RabbitMQ+Flume+Kafka的使用

背景

如果你的平臺(tái)使用RabbitMQ蒲凶,并且短時(shí)間不想換Kafka的話茄螃,可以考慮使用以下方式去把數(shù)據(jù)對(duì)接到大數(shù)據(jù)平臺(tái)缝驳,只要對(duì)接到kafka,后面用什么技術(shù)归苍,由你選擇用狱。

RabbitMQ

RabbitMQ的使用這邊不多介紹,只要RabbitMQ上有可用的Queue存在就行
或者寫一個(gè)java的生產(chǎn)者
maven項(xiàng)目配置

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.6</version>
        </dependency>

Java代碼

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQ_Producer {
    private final static String QUEUE_NAME  = "rk_queue_test";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 獲取到連接以及mq通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        factory.setVirtualHost("xxx");
        factory.setHost("192.168.70.xxx");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
        // 創(chuàng)建一個(gè)頻道
        Channel channel = conn.createChannel();
        // 指定一個(gè)隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        int prefetchCount = 1;

        //每個(gè)消費(fèi)者發(fā)送確認(rèn)信號(hào)之前拼弃,消息隊(duì)列不發(fā)送下一個(gè)消息過(guò)來(lái)齿拂,一次只處理一個(gè)消息
        //限制發(fā)給同一個(gè)消費(fèi)者不得超過(guò)1條消息
        channel.basicQos(prefetchCount);

        // 發(fā)送的消息
        for (int i = 0; i < 50; i++) {
            String message = "." + i;
            // 往隊(duì)列中發(fā)出一條消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        // 關(guān)閉頻道和連接
        channel.close();
        conn.close();
    }
}

Flume

Flume自帶是沒有對(duì)接RabbitMQ的,你需要自行寫一個(gè)對(duì)接的代碼肴敛,當(dāng)然已經(jīng)有大神早就寫好了,我們拿來(lái)用就可以了吗购。rabbitmq-flume-plugin點(diǎn)擊下載代碼医男,打包成jar,復(fù)制到Flume的lib目錄下捻勉。然后編寫到conf 目錄下編寫 rabbit-flume-kafka.properties 配置文件镀梭。

vim conf/rabbit-flume-kafka.properties

rabbit-flume-kafka.properties的內(nèi)容,如下:

a1.channels = ch-1
a1.sources = src-1
a1.channels.ch-1.type=memory

a1.sources.src-1.channels = ch-1
a1.sources.src-1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.src-1.host = 192.168.70.xx #RabbitMQ的IP
a1.sources.src-1.port = 5672
a1.sources.src-1.virtual-host = vh
a1.sources.src-1.username = xxxx  #RabbitMQ的用戶
a1.sources.src-1.password = xxxxxx   #RabbitMQ的密碼
a1.sources.src-1.queue = rk_queue_test
a1.sources.src-1.prefetchCount = 10

a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = ch-1
a1.sinks.k1.topic=rfk_out
a1.sinks.k1.brokerList=192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20

flume運(yùn)行命令

/usr/hdp/2.6.3.0-235/flume/bin/flume-ng agent --conf conf --conf-file conf/rabbit-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console

Kafka

這邊運(yùn)行一個(gè)Kafka的消費(fèi)者踱启,消費(fèi)一下數(shù)據(jù)就可以了

/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667 --topic rfk_out --from-beginning

如果消費(fèi)到數(shù)據(jù)說(shuō)明搭建成功报账,這邊驗(yàn)證信息就不寫了研底,如果在使用的時(shí)候有什么問(wèn)題請(qǐng)?jiān)谠u(píng)論上提出。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末透罢,一起剝皮案震驚了整個(gè)濱河市榜晦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌羽圃,老刑警劉巖乾胶,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異朽寞,居然都是意外死亡识窿,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門脑融,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)喻频,“玉大人,你說(shuō)我怎么就攤上這事肘迎∩拢” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵膜宋,是天一觀的道長(zhǎng)窿侈。 經(jīng)常有香客問(wèn)我,道長(zhǎng)秋茫,這世上最難降的妖魔是什么史简? 我笑而不...
    開封第一講書人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮肛著,結(jié)果婚禮上圆兵,老公的妹妹穿的比我還像新娘。我一直安慰自己枢贿,他們只是感情好殉农,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著局荚,像睡著了一般超凳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上耀态,一...
    開封第一講書人閱讀 49,950評(píng)論 1 291
  • 那天轮傍,我揣著相機(jī)與錄音,去河邊找鬼首装。 笑死创夜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的仙逻。 我是一名探鬼主播驰吓,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼涧尿,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了檬贰?” 一聲冷哼從身側(cè)響起姑廉,我...
    開封第一講書人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎偎蘸,沒想到半個(gè)月后庄蹋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡迷雪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年限书,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片章咧。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡倦西,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出赁严,到底是詐尸還是另有隱情扰柠,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布疼约,位于F島的核電站卤档,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏程剥。R本人自食惡果不足惜劝枣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望织鲸。 院中可真熱鬧舔腾,春花似錦、人聲如沸搂擦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瀑踢。三九已至扳还,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間橱夭,已是汗流浹背普办。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留徘钥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓肢娘,卻偏偏與公主長(zhǎng)得像呈础,于是被迫代替她去往敵國(guó)和親舆驶。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理而钞,服務(wù)發(fā)現(xiàn)沙廉,斷路器,智...
    卡卡羅2017閱讀 134,637評(píng)論 18 139
  • Kafka簡(jiǎn)介 Kafka是一種分布式的臼节,基于發(fā)布/訂閱的消息系統(tǒng)撬陵。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,074評(píng)論 0 43
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來(lái)...
    Chandler_玨瑜閱讀 6,572評(píng)論 2 39
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,465評(píng)論 0 34
  • Kafka官網(wǎng):http://kafka.apache.org/入門1.1 介紹Kafka? 是一個(gè)分布式流處理系...
    it_zzy閱讀 3,887評(píng)論 3 53