背景
如果你的平臺(tái)使用RabbitMQ蒲凶,并且短時(shí)間不想換Kafka的話茄螃,可以考慮使用以下方式去把數(shù)據(jù)對(duì)接到大數(shù)據(jù)平臺(tái)缝驳,只要對(duì)接到kafka,后面用什么技術(shù)归苍,由你選擇用狱。
RabbitMQ
RabbitMQ的使用這邊不多介紹,只要RabbitMQ上有可用的Queue存在就行
或者寫一個(gè)java的生產(chǎn)者
maven項(xiàng)目配置
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.6</version>
</dependency>
Java代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQ_Producer {
private final static String QUEUE_NAME = "rk_queue_test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 獲取到連接以及mq通道
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("xxx");
factory.setPassword("xxx");
factory.setVirtualHost("xxx");
factory.setHost("192.168.70.xxx");
factory.setPort(5672);
Connection conn = factory.newConnection();
// 創(chuàng)建一個(gè)頻道
Channel channel = conn.createChannel();
// 指定一個(gè)隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
//每個(gè)消費(fèi)者發(fā)送確認(rèn)信號(hào)之前拼弃,消息隊(duì)列不發(fā)送下一個(gè)消息過(guò)來(lái)齿拂,一次只處理一個(gè)消息
//限制發(fā)給同一個(gè)消費(fèi)者不得超過(guò)1條消息
channel.basicQos(prefetchCount);
// 發(fā)送的消息
for (int i = 0; i < 50; i++) {
String message = "." + i;
// 往隊(duì)列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
// 關(guān)閉頻道和連接
channel.close();
conn.close();
}
}
Flume
Flume自帶是沒有對(duì)接RabbitMQ的,你需要自行寫一個(gè)對(duì)接的代碼肴敛,當(dāng)然已經(jīng)有大神早就寫好了,我們拿來(lái)用就可以了吗购。rabbitmq-flume-plugin點(diǎn)擊下載代碼医男,打包成jar,復(fù)制到Flume的lib目錄下捻勉。然后編寫到conf 目錄下編寫 rabbit-flume-kafka.properties 配置文件镀梭。
vim conf/rabbit-flume-kafka.properties
rabbit-flume-kafka.properties的內(nèi)容,如下:
a1.channels = ch-1
a1.sources = src-1
a1.channels.ch-1.type=memory
a1.sources.src-1.channels = ch-1
a1.sources.src-1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.src-1.host = 192.168.70.xx #RabbitMQ的IP
a1.sources.src-1.port = 5672
a1.sources.src-1.virtual-host = vh
a1.sources.src-1.username = xxxx #RabbitMQ的用戶
a1.sources.src-1.password = xxxxxx #RabbitMQ的密碼
a1.sources.src-1.queue = rk_queue_test
a1.sources.src-1.prefetchCount = 10
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = ch-1
a1.sinks.k1.topic=rfk_out
a1.sinks.k1.brokerList=192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20
flume運(yùn)行命令
/usr/hdp/2.6.3.0-235/flume/bin/flume-ng agent --conf conf --conf-file conf/rabbit-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console
Kafka
這邊運(yùn)行一個(gè)Kafka的消費(fèi)者踱启,消費(fèi)一下數(shù)據(jù)就可以了
/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667 --topic rfk_out --from-beginning
如果消費(fèi)到數(shù)據(jù)說(shuō)明搭建成功报账,這邊驗(yàn)證信息就不寫了研底,如果在使用的時(shí)候有什么問(wèn)題請(qǐng)?jiān)谠u(píng)論上提出。