背景:
- emq開源版集成了許多插件娜谊,但沒有提供kafka的怀浆。但官方提供了插件的模板治宣,我們只需要照葫蘆畫瓢即可;
- 由于emq是基于Erlang編程語言提澎,所以我們需要搭建Erlang環(huán)境(Erlang這語言姚垃,不到萬不得已,千萬別碰E渭伞;础);
- 選擇v2.3.11是因?yàn)榻刂箤戇@篇博客的時(shí)候谦纱,v2版本才是真正的穩(wěn)定版絮宁。
Ok,開始吧服协!
一绍昂、Erlang環(huán)境
- 安裝依賴
yum install fop
yum install gcc gcc-c++ glibc-devel kernel-devel make openssl-devel autoconf
yum -y install ncurses ncurses-devel
yum install m4 openssl-devel unixODBC unixODBC-devel
- 下載安裝包
wget http://erlang.org/download/otp_src_20.3.tar.gz
- 解壓、安裝
tar -zxvf otp_src_20.3.tar.gz
./configure --prefix=/usr/local/erlang
make
make install
- 添加環(huán)境變量
vi /etc/profile
#set erlang environment
export PATH=$PATH:/usr/local/erlang/bin
source /etc/profile
-
驗(yàn)證
二窘游、測(cè)試安裝Emq
因?yàn)槲覀兒罄m(xù)要加上自己的kafka插件代碼,所以這步只是測(cè)試是否能正常編譯
- 下載源碼
wget https://codeload.github.com/emqx/emqx-rel/tar.gz/v2.3.11
mv v2.3.11 emq-v2.3.11.tar.gz
- 解壓
tar -zxvf emq-v2.3.11.tar.gz
cd emqx-rel-2.3.11
- 編譯
make
編譯需要花點(diǎn)時(shí)間跳纳,如果編譯成功忍饰,就可以進(jìn)入下一步了。
如果有錯(cuò)誤寺庄,那么一定是linux環(huán)境的問題艾蓝,因?yàn)檫@個(gè)Erlang版本和Emq版本是我測(cè)試過的。我之前就因?yàn)橐恢睕]通過斗塘,所以重新在本地建了一個(gè)centos7虛擬機(jī)赢织,再按照步驟來,就成功了馍盟。
三于置、編寫插件
非常感謝寫這篇博客《EMQ集成Kafka插件編寫過程 emq_plugin_kafka》的作者啊,幫了大忙贞岭!
除了有一些小問題~
-
第一次編譯之后八毯,文件結(jié)構(gòu)如下:
- 復(fù)制一份插件模板,并更名:
cd deps
cp -r emq_plugin_template emq_plugin_kafka
cd emq_plugin_kafka
-
插件目錄結(jié)構(gòu)
- 修改關(guān)鍵字“template”為“kafka”
將插件目錄emq_plugin_kafka中所有文件名和里面的內(nèi)容中的“template”改為“kafka”
(你總不希望別人看你的代碼代碼時(shí)瞄桨,上面的“template”幾個(gè)大字赫然在目吧话速?),包括src目錄下芯侥、Makefile文件泊交、etc下等 - 初步處理src下的文件
- 因?yàn)闆]用到權(quán)限驗(yàn)證和登錄授權(quán)驗(yàn)證,所以刪除acl和auth代碼
rm -rf emq_acl_demo.erl
rm -rf emq_auth_demo.erl
- 修改emq_plugin_kafka_app.erl文件,把a(bǔ)cl和auth的模塊注冊(cè)代碼去掉活合,并加些打印語句
vim emq_plugin_kafka_app.erl
-module(emq_plugin_kafka_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emq_plugin_kafka_sup:start_link(),
%% ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
%% ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),
emq_plugin_kafka:load(application:get_all_env()),
io:format("emq_plugin_kafka start.~n", []),
{ok, Sup}.
stop(_State) ->
%% ok = emqttd_access_control:unregister_mod(auth, emq_auth_demo),
%% ok = emqttd_access_control:unregister_mod(acl, emq_acl_demo),
emq_plugin_kafka:unload(),
io:format("emq_plugin_kafka stop.~n", []).
- 暫時(shí)測(cè)試一下
①修改配置文件relx.config:vim relx.config
添加一行:{emq_plugin_kafka, load},
②修改Makefile:vim Makefile
添加emq_plugin_kafka:
DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
emq_auth_mysql emq_auth_pgsql emq_auth_redis emq_auth_mongo \
emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
emq_lua_hook emq_auth_jwt emq_plugin_kafka
③設(shè)置emq啟動(dòng)時(shí)加載插件:vim data/loaded_plugins
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka.
④在emq根目錄make雏婶,如果不成功,可能是文件/內(nèi)容修改不完全
⑤控制臺(tái)啟動(dòng)emq
./_rel/emqttd/bin/emqttd console
啟動(dòng)日志:
6.1. 連接單節(jié)點(diǎn)的kafka的代碼——ekaf工具
如何搭建kafka留晚,可參考我的另一篇博客《三臺(tái)虛擬機(jī)搭建kafka集群(和zookeeper集群)》
假設(shè)你已經(jīng)搭建好了kafka。
為了連接上kafka告嘲,需要用到基于Erlang寫的工具ekaf错维。
- 修改配置文件
① 配置文件變更歷史,官方介紹
通過我的后期實(shí)踐的理解橄唬,conf后綴對(duì)應(yīng)key=value這種配置格式赋焕,config后綴對(duì)應(yīng)Erlang原始配置格式
Makefile里會(huì)有對(duì)應(yīng)配置文件路徑:
app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data
② 進(jìn)入emq_plugin_kafka/etc,修改文件名后綴為conf仰楚,因?yàn)镸akefile里寫的是conf
mv emq_plugin_kafka.config emq_plugin_kafka.conf
③ 編輯該文件:vim emq_plugin_kafka.conf
emq.plugin.kafka.server = 192.168.220.129:9092
emq.plugin.kafka.topic = kafka-topic
④ 在deps/emq_plugin_kafka目錄下隆判,新建priv文件夾,然后新建schema文件
mkdir priv
vim priv/emq_plugin_kafka.schema
我復(fù)制上面提到的博客《EMQ集成Kafka插件編寫過程 emq_plugin_kafka》里的代碼僧界,老是解析這個(gè)文件出錯(cuò)侨嘀,不得已,自己一點(diǎn)一點(diǎn)敲捂襟,最后變成了這個(gè)樣子:
{
mapping,
"emq.plugin.kafka.server",
"emq_plugin_kafka.server",
[
{
mapping,
"emq.plugin.kafka.server",
"emq_plugin_kafka.server",
[
{default, {"127.0.0.1", 9092}},
{datatype, [integer, ip, string]}
]
}.
{
mapping,
"emq.plugin.kafka.topic",
"emq_plugin_kafka.server",
[
{default, "test"},
{datatype, string},
hidden
]
}.
{
translation,
"emq_plugin_kafka.server",
fun(Conf) ->
{RHost, RPort} = case cuttlefish:conf_get("emq.plugin.kafka.server", Conf) of
{Ip, Port} -> {Ip, Port};
S -> case string:tokens(S, ":") of
[Domain] -> {Domain, 9092};
[Domain, Port] -> {Domain, list_to_integer(Port)}
end
end,
Topic = cuttlefish:conf_get("emq.plugin.kafka.topic", Conf),
[
{host, RHost},
{port, RPort},
{topic, Topic}
]
end
}.
⑤ 修改Makefile文件咬腕,增加ekaf依賴
PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.11
BUILD_DEPS = emqttd cuttlefish ekaf
dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
dep_ekaf = git https://github.com/helpshift/ekaf master
ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'
NO_AUTOPATCH = cuttlefish
COVER = true
include erlang.mk
app:: rebar.config
app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data
⑥ 編寫邏輯代碼:vim src/emq_plugin_kafka.erl
主要是寫了個(gè) ekaf_send(Message, _Env) 方法,然后再消息到來的時(shí)候調(diào)用
-module(emq_plugin_kafka).
-include_lib("emqttd/include/emqttd.hrl").
-define(APP, emq_plugin_kafka).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
ekaf_init(Env),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
ekaf_send(Message, _Env),
{ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
ekaf_init(_Env) ->
{ok, Kafka_Env} = application:get_env(?APP, server),
Host = proplists:get_value(host, Kafka_Env),
Port = proplists:get_value(port, Kafka_Env),
Broker = {Host, Port},
%Broker = {"192.168.52.130", 9092},
Topic = proplists:get_value(topic, Kafka_Env),
%Topic = "test-topic",
application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
%%設(shè)置數(shù)據(jù)上報(bào)間隔葬荷,ekaf默認(rèn)是數(shù)據(jù)達(dá)到1000條或者5秒涨共,觸發(fā)上報(bào)
application:set_env(ekaf, ekaf_buffer_ttl, 100),
{ok, _} = application:ensure_all_started(ekaf).
%io:format("Init ekaf with ~p~n", [Broker]),
%Json = mochijson2:encode([
% {type, <<"connected">>},
% {client_id, <<"test-client_id">>},
% {cluster_node, <<"node">>}
%]),
%io:format("send : ~w.~n",[ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Json))]).
ekaf_send(Message, _Env) ->
From = Message#mqtt_message.from,
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
ClientId = get_form_clientid(From),
Username = get_form_username(From),
io:format("publish ~s~n", [emqttd_message:format(Message)]),
Str = [
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, Qos},
{dup, Dup},
{retain, Retain}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
],
%io:format("Str : ~s~n", [Str]),
Json = mochijson2:encode(Str),
KafkaTopic = get_topic(),
ekaf:produce_sync_batched(KafkaTopic, list_to_binary(Json)).
get_form_clientid({ClientId, Username}) -> ClientId;
get_form_clientid(From) -> From.
get_form_username({ClientId, Username}) -> Username;
get_form_username(From) -> From.
get_topic() ->
{ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
Topic.
6.2 連接集群的kafka的代碼——brod工具
ekaf工具不支持集群,brod支持宠漩,當(dāng)然举反,它也支持單節(jié)點(diǎn)~
因?yàn)槲矣昧薵it工具,所以大家可以看看哄孤,在6.1的基礎(chǔ)上照筑,做了哪些改動(dòng)
[root@localhost emq_plugin_kafka]# git status
# 位于分支 based-on-brod
# 尚未暫存以備提交的變更:
# (使用 "git add/rm <file>..." 更新要提交的內(nèi)容)
# (使用 "git checkout -- <file>..." 丟棄工作區(qū)的改動(dòng))
#
# 修改: Makefile
# 刪除: etc/emq_plugin_kafka.conf
# 修改: src/emq_plugin_kafka.erl
#
# 未跟蹤的文件:
# (使用 "git add <file>..." 以包含要提交的內(nèi)容)
#
# etc/emq_plugin_kafka.config
修改尚未加入提交(使用 "git add" 和/或 "git commit -a")
所以你們只需要:
① 修改Makefile
PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.11
BUILD_DEPS = emqttd cuttlefish ekaf brod
dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
dep_ekaf = git https://github.com/helpshift/ekaf master
dep_brod = git https://github.com/klarna/brod.git 3.7.3
ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'
NO_AUTOPATCH = cuttlefish
COVER = true
include erlang.mk
app:: rebar.config
app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.config -i priv/emq_plugin_kafka.schema -d data
② 刪除etc/emq_plugin_kafka.conf
rm -f etc/emq_plugin_kafka.conf
③ 修改src/emq_plugin_kafka.erl
-module(emq_plugin_kafka).
-include_lib("emqttd/include/emqttd.hrl").
-include_lib("brod/include/brod_int.hrl").
-define(TEST_TOPIC, <<"test-topic">>).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
brod_init([Env]),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
Json = mochijson2:encode([
{type, <<"connected">>},
{client_id, ClientId},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaTopic = proplists:get_value(kafka_topic, Kafka),
{ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_1">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
io:format("brod_produce_reply:ok ~n"),
ok
after 5000 ->
io:format("brod_produce_reply:exit ~n"),
erlang:exit(timeout)
%%ct:fail({?MODULE, ?LINE, timeout})
end,
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
Json = mochijson2:encode([
{type, <<"disconnected">>},
{client_id, ClientId},
{reason, Reason},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaTopic = proplists:get_value(kafka_topic, Kafka),
{ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_2">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
ok
after 5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
%%根據(jù)topic前綴來分發(fā)到對(duì)應(yīng)方法:以$SYS/開頭
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
Id = Message#mqtt_message.id,
From = Message#mqtt_message.from, %需要登錄和不需要登錄這里的返回值是不一樣的
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
Timestamp = Message#mqtt_message.timestamp,
ClientId = c(From),
Username = u(From),
%%ClientId作為Key
Key = iolist_to_binary(ClientId),
%%獲得分區(qū)數(shù)
Partition = getPartition(Key),
%%讀取配置文件
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaTopic = proplists:get_value(kafka_topic, Kafka),
Json = mochijson2:encode([
{type, <<"publish">>},
{partition_num, Partition},
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, i(Qos)},
{dup, i(Dup)},
{retain, i(Retain)}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
{ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, Partition, ClientId, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
ok
after 5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,
{ok, Message}.
%%key的md5的最后一位進(jìn)行取模吹截,獲取分區(qū)數(shù)
getPartition(Key) ->
{ok, Kafka} = application:get_env(?MODULE, kafka),
PartitionNum = proplists:get_value(kafka_producer_partition, Kafka),
<<Fix:120, Match:8>> = crypto:hash(md5, Key),
abs(Match) rem PartitionNum.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
%%application:stop(brod),
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ===================================================================
%% brod_init https://github.com/klarna/brod
%% ===================================================================
brod_init(_Env) ->
{ok, _} = application:ensure_all_started(brod),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
%%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
%%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
%%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
ClientConfig = [],%% socket error recovery
{ok, Kafka} = application:get_env(?MODULE, kafka),
Topic = proplists:get_value(kafka_topic, Kafka),
Partition = 0,
%%下面兩行是初始化client瘦陈,一個(gè)client只能發(fā)送到一個(gè)topic,如果要多個(gè)topic波俄,則創(chuàng)建多個(gè)client
ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
%%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
%%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
io:format("Init brod with ~p~n", [KafkaBootstrapEndpoints]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.
④ 新增etc/emq_plugin_kafka.config
[
{emq_plugin_kafka, [
{kafka, [
{ bootstrap_broker, [{"192.168.220.129", 9092},{"192.168.220.130", 9092},{"192.168.220.131", 9092}] },
{ query_api_versions, false },
{ reconnect_cool_down_seconds, 10},
{kafka_producer_partition, 3},
{kafka_topic, <<"kafka-new-topic">>}
]}
]}
].
- 再修改一下emq的配置文件:
vim relx.config
添加:{ekaf, load},
四晨逝、大功告成
- 先刪除_rel文件夾
make
- 控制臺(tái)啟動(dòng):
./_rel/emqttd/bin/emqttd console
- 打開kafka控制臺(tái)查看日志,這里的“kafka-topic”就是emq_plugin_kafka.conf文件里的配置項(xiàng):
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-topic
-
網(wǎng)頁測(cè)試: