Centos7 搭建v2.3.11 emq broker及編寫kafka插件

背景:

  • emq開源版集成了許多插件娜谊,但沒有提供kafka的怀浆。但官方提供了插件的模板治宣,我們只需要照葫蘆畫瓢即可;
  • 由于emq是基于Erlang編程語言提澎,所以我們需要搭建Erlang環(huán)境(Erlang這語言姚垃,不到萬不得已,千萬別碰E渭伞;础);
  • 選擇v2.3.11是因?yàn)榻刂箤戇@篇博客的時(shí)候谦纱,v2版本才是真正的穩(wěn)定版絮宁。

Ok,開始吧服协!


一绍昂、Erlang環(huán)境
  1. 安裝依賴
    yum install fop
    yum install gcc gcc-c++ glibc-devel kernel-devel make openssl-devel autoconf
    yum -y install ncurses ncurses-devel
    yum install m4 openssl-devel unixODBC unixODBC-devel
  2. 下載安裝包
    wget http://erlang.org/download/otp_src_20.3.tar.gz
  3. 解壓、安裝
    tar -zxvf otp_src_20.3.tar.gz
    ./configure --prefix=/usr/local/erlang
    make
    make install
  4. 添加環(huán)境變量
    vi /etc/profile
#set erlang environment
export PATH=$PATH:/usr/local/erlang/bin

source /etc/profile

  1. 驗(yàn)證


    Erlang成功偿荷!ctrl + c >press 'a'退出
二窘游、測(cè)試安裝Emq

因?yàn)槲覀兒罄m(xù)要加上自己的kafka插件代碼,所以這步只是測(cè)試是否能正常編譯

  1. 下載源碼
    wget https://codeload.github.com/emqx/emqx-rel/tar.gz/v2.3.11
    mv v2.3.11 emq-v2.3.11.tar.gz
  2. 解壓
    tar -zxvf emq-v2.3.11.tar.gz
    cd emqx-rel-2.3.11
  3. 編譯
    make
    編譯需要花點(diǎn)時(shí)間跳纳,如果編譯成功忍饰,就可以進(jìn)入下一步了。
    如果有錯(cuò)誤寺庄,那么一定是linux環(huán)境的問題艾蓝,因?yàn)檫@個(gè)Erlang版本和Emq版本是我測(cè)試過的。我之前就因?yàn)橐恢睕]通過斗塘,所以重新在本地建了一個(gè)centos7虛擬機(jī)赢织,再按照步驟來,就成功了馍盟。
三于置、編寫插件

非常感謝寫這篇博客《EMQ集成Kafka插件編寫過程 emq_plugin_kafka》的作者啊,幫了大忙贞岭!
除了有一些小問題~

  1. 第一次編譯之后八毯,文件結(jié)構(gòu)如下:


    emq文件結(jié)構(gòu)
  2. 復(fù)制一份插件模板,并更名:
    cd deps
    cp -r emq_plugin_template emq_plugin_kafka
    cd emq_plugin_kafka
  3. 插件目錄結(jié)構(gòu)


    插件目錄結(jié)構(gòu)
  4. 修改關(guān)鍵字“template”為“kafka”
    將插件目錄emq_plugin_kafka中所有文件名和里面的內(nèi)容中的“template”改為“kafka”
    (你總不希望別人看你的代碼代碼時(shí)瞄桨,上面的“template”幾個(gè)大字赫然在目吧话速?),包括src目錄下芯侥、Makefile文件泊交、etc下等
  5. 初步處理src下的文件
  • 因?yàn)闆]用到權(quán)限驗(yàn)證和登錄授權(quán)驗(yàn)證,所以刪除acl和auth代碼
    rm -rf emq_acl_demo.erl
    rm -rf emq_auth_demo.erl
  • 修改emq_plugin_kafka_app.erl文件,把a(bǔ)cl和auth的模塊注冊(cè)代碼去掉活合,并加些打印語句
    vim emq_plugin_kafka_app.erl
-module(emq_plugin_kafka_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
    {ok, Sup} = emq_plugin_kafka_sup:start_link(),
    %% ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
    %% ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),
    emq_plugin_kafka:load(application:get_all_env()),
    io:format("emq_plugin_kafka start.~n", []),
    {ok, Sup}.

stop(_State) ->
    %% ok = emqttd_access_control:unregister_mod(auth, emq_auth_demo),
    %% ok = emqttd_access_control:unregister_mod(acl, emq_acl_demo),
    emq_plugin_kafka:unload(),
    io:format("emq_plugin_kafka stop.~n", []).
  • 暫時(shí)測(cè)試一下
    ①修改配置文件relx.config:vim relx.config
    添加一行:{emq_plugin_kafka, load},
    ②修改Makefile:vim Makefile
    添加emq_plugin_kafka:
DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
        emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
        emq_auth_mysql emq_auth_pgsql emq_auth_redis emq_auth_mongo \
        emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
        emq_lua_hook emq_auth_jwt emq_plugin_kafka

③設(shè)置emq啟動(dòng)時(shí)加載插件:vim data/loaded_plugins

emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka.

④在emq根目錄make雏婶,如果不成功,可能是文件/內(nèi)容修改不完全
控制臺(tái)啟動(dòng)emq
./_rel/emqttd/bin/emqttd console
啟動(dòng)日志:

啟動(dòng)日志白指,也可以通過ip:18083來查看插件啟動(dòng)狀態(tài)

6.1. 連接單節(jié)點(diǎn)的kafka的代碼——ekaf工具

如何搭建kafka留晚,可參考我的另一篇博客《三臺(tái)虛擬機(jī)搭建kafka集群(和zookeeper集群)
假設(shè)你已經(jīng)搭建好了kafka。
為了連接上kafka告嘲,需要用到基于Erlang寫的工具ekaf错维。

  • 修改配置文件
    ① 配置文件變更歷史,官方介紹
    通過我的后期實(shí)踐的理解橄唬,conf后綴對(duì)應(yīng)key=value這種配置格式赋焕,config后綴對(duì)應(yīng)Erlang原始配置格式
    Makefile里會(huì)有對(duì)應(yīng)配置文件路徑:
app.config::
        ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data

② 進(jìn)入emq_plugin_kafka/etc,修改文件名后綴為conf仰楚,因?yàn)镸akefile里寫的是conf
mv emq_plugin_kafka.config emq_plugin_kafka.conf
③ 編輯該文件:vim emq_plugin_kafka.conf

emq.plugin.kafka.server = 192.168.220.129:9092
emq.plugin.kafka.topic = kafka-topic

④ 在deps/emq_plugin_kafka目錄下隆判,新建priv文件夾,然后新建schema文件
mkdir priv
vim priv/emq_plugin_kafka.schema
我復(fù)制上面提到的博客《EMQ集成Kafka插件編寫過程 emq_plugin_kafka》里的代碼僧界,老是解析這個(gè)文件出錯(cuò)侨嘀,不得已,自己一點(diǎn)一點(diǎn)敲捂襟,最后變成了這個(gè)樣子:

{
    mapping,
    "emq.plugin.kafka.server",
    "emq_plugin_kafka.server",
    [
{
    mapping,
    "emq.plugin.kafka.server",
    "emq_plugin_kafka.server",
    [
        {default, {"127.0.0.1", 9092}},
        {datatype, [integer, ip, string]}
    ]
}.

{
    mapping,
    "emq.plugin.kafka.topic",
    "emq_plugin_kafka.server",
    [
        {default, "test"},
        {datatype, string},
        hidden
    ]
}.

{
    translation,
    "emq_plugin_kafka.server",
    fun(Conf) ->
            {RHost, RPort} = case cuttlefish:conf_get("emq.plugin.kafka.server", Conf) of
                                 {Ip, Port} -> {Ip, Port};
                                 S          -> case string:tokens(S, ":") of
                                                   [Domain]       -> {Domain, 9092};
                                                   [Domain, Port] -> {Domain, list_to_integer(Port)}
                                               end
                             end,
            Topic = cuttlefish:conf_get("emq.plugin.kafka.topic", Conf),
            [
             {host, RHost},
             {port, RPort},
             {topic, Topic}
            ]
    end
}.

⑤ 修改Makefile文件咬腕,增加ekaf依賴

PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.11

BUILD_DEPS = emqttd cuttlefish ekaf
dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
dep_ekaf = git https://github.com/helpshift/ekaf master

ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'

NO_AUTOPATCH = cuttlefish

COVER = true

include erlang.mk

app:: rebar.config

app.config::
        ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data

⑥ 編寫邏輯代碼vim src/emq_plugin_kafka.erl
主要是寫了個(gè) ekaf_send(Message, _Env) 方法,然后再消息到來的時(shí)候調(diào)用

-module(emq_plugin_kafka).

-include_lib("emqttd/include/emqttd.hrl").

-define(APP, emq_plugin_kafka).

-export([load/1, unload/0]).

%% Hooks functions

-export([on_client_connected/3, on_client_disconnected/3]).

-export([on_client_subscribe/4, on_client_unsubscribe/4]).

-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

%% Called when the plugin application start
load(Env) ->
    ekaf_init(Env),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    {ok, Client}.

on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    ok.

on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.

on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.

on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).

on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.

on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.

on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).

%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};

on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    ekaf_send(Message, _Env),
    {ok, Message}.

on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.

on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.

%% Called when the plugin application stop
unload() ->
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).

ekaf_init(_Env) ->
    {ok, Kafka_Env} = application:get_env(?APP, server),
    Host = proplists:get_value(host, Kafka_Env),
    Port = proplists:get_value(port, Kafka_Env),
    Broker = {Host, Port},
    %Broker = {"192.168.52.130", 9092},
    Topic = proplists:get_value(topic, Kafka_Env),
    %Topic = "test-topic",

    application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
    application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
    application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
    %%設(shè)置數(shù)據(jù)上報(bào)間隔葬荷,ekaf默認(rèn)是數(shù)據(jù)達(dá)到1000條或者5秒涨共,觸發(fā)上報(bào)
    application:set_env(ekaf, ekaf_buffer_ttl, 100),

    {ok, _} = application:ensure_all_started(ekaf).
    %io:format("Init ekaf with ~p~n", [Broker]),
    %Json = mochijson2:encode([
    %    {type, <<"connected">>},
    %    {client_id, <<"test-client_id">>},
    %    {cluster_node, <<"node">>}
    %]),
    %io:format("send : ~w.~n",[ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Json))]).


ekaf_send(Message, _Env) ->
    From = Message#mqtt_message.from,
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    ClientId = get_form_clientid(From),
    Username = get_form_username(From),
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    Str = [
              {client_id, ClientId},
              {message, [
                            {username, Username},
                            {topic, Topic},
                            {payload, Payload},
                            {qos, Qos},
                            {dup, Dup},
                            {retain, Retain}
                        ]},
               {cluster_node, node()},
               {ts, emqttd_time:now_ms()}
           ],
    %io:format("Str : ~s~n", [Str]),
    Json = mochijson2:encode(Str),
    KafkaTopic = get_topic(),
    ekaf:produce_sync_batched(KafkaTopic, list_to_binary(Json)).

get_form_clientid({ClientId, Username}) -> ClientId;
get_form_clientid(From) -> From.
get_form_username({ClientId, Username}) -> Username;
get_form_username(From) -> From.

get_topic() ->
    {ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
    Topic.
6.2 連接集群的kafka的代碼——brod工具

ekaf工具不支持集群,brod支持宠漩,當(dāng)然举反,它也支持單節(jié)點(diǎn)~
因?yàn)槲矣昧薵it工具,所以大家可以看看哄孤,在6.1的基礎(chǔ)上照筑,做了哪些改動(dòng)

[root@localhost emq_plugin_kafka]# git status
# 位于分支 based-on-brod
# 尚未暫存以備提交的變更:
#   (使用 "git add/rm <file>..." 更新要提交的內(nèi)容)
#   (使用 "git checkout -- <file>..." 丟棄工作區(qū)的改動(dòng))
#
#   修改:      Makefile
#   刪除:      etc/emq_plugin_kafka.conf
#   修改:      src/emq_plugin_kafka.erl
#
# 未跟蹤的文件:
#   (使用 "git add <file>..." 以包含要提交的內(nèi)容)
#
#   etc/emq_plugin_kafka.config
修改尚未加入提交(使用 "git add" 和/或 "git commit -a")

所以你們只需要:
① 修改Makefile

PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.11

BUILD_DEPS = emqttd cuttlefish ekaf brod
dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
dep_ekaf = git https://github.com/helpshift/ekaf master
dep_brod = git https://github.com/klarna/brod.git 3.7.3

ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'

NO_AUTOPATCH = cuttlefish

COVER = true

include erlang.mk

app:: rebar.config

app.config::
        ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.config -i priv/emq_plugin_kafka.schema -d data

② 刪除etc/emq_plugin_kafka.conf
rm -f etc/emq_plugin_kafka.conf
③ 修改src/emq_plugin_kafka.erl

-module(emq_plugin_kafka).
 
-include_lib("emqttd/include/emqttd.hrl").
 
-include_lib("brod/include/brod_int.hrl").
 
-define(TEST_TOPIC, <<"test-topic">>).
 
-export([load/1, unload/0]).
 
%% Hooks functions
 
-export([on_client_connected/3, on_client_disconnected/3]).
 
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
 
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
 
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
 
%% Called when the plugin application start
load(Env) ->
    brod_init([Env]),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
 
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    Json = mochijson2:encode([
        {type, <<"connected">>},
        {client_id, ClientId},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    KafkaTopic = proplists:get_value(kafka_topic, Kafka),
    {ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_1">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        io:format("brod_produce_reply:ok ~n"),
        ok
    after 5000 ->
        io:format("brod_produce_reply:exit ~n"),
        erlang:exit(timeout)
        %%ct:fail({?MODULE, ?LINE, timeout})
    end,
 
    {ok, Client}.
 
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    Json = mochijson2:encode([
        {type, <<"disconnected">>},
        {client_id, ClientId},
        {reason, Reason},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
 
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    KafkaTopic = proplists:get_value(kafka_topic, Kafka),
    {ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_2">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        ok
    after 5000 ->
        ct:fail({?MODULE, ?LINE, timeout})
    end,
 
    ok.
 
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.
    
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.
 
on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).
 
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.
 
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.
 
on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
 
%% transform message and return
%%根據(jù)topic前綴來分發(fā)到對(duì)應(yīng)方法:以$SYS/開頭
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};
 
on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    
    Id = Message#mqtt_message.id,
    From = Message#mqtt_message.from, %需要登錄和不需要登錄這里的返回值是不一樣的
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    Timestamp = Message#mqtt_message.timestamp,
 
    ClientId = c(From),
    Username = u(From),

    %%ClientId作為Key
    Key = iolist_to_binary(ClientId),
    %%獲得分區(qū)數(shù)
    Partition = getPartition(Key),
    %%讀取配置文件
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    KafkaTopic = proplists:get_value(kafka_topic, Kafka),
 
    Json = mochijson2:encode([
                  {type, <<"publish">>},
                              {partition_num, Partition},
                  {client_id, ClientId},
                  {message, [
                     {username, Username},
                     {topic, Topic},
                     {payload, Payload},
                     {qos, i(Qos)},
                     {dup, i(Dup)},
                     {retain, i(Retain)}
                    ]},
                  {cluster_node, node()},
                  {ts, emqttd_time:now_ms()}
                 ]),
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, Partition, ClientId, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        ok
    after 5000 ->
        ct:fail({?MODULE, ?LINE, timeout})
    end,
 
    {ok, Message}.

%%key的md5的最后一位進(jìn)行取模吹截,獲取分區(qū)數(shù)
getPartition(Key) ->
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    PartitionNum = proplists:get_value(kafka_producer_partition, Kafka),
    <<Fix:120, Match:8>> = crypto:hash(md5, Key),
    abs(Match) rem PartitionNum.
 
on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
 
on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
 
%% Called when the plugin application stop
unload() ->
    %%application:stop(brod),
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
 
%% ===================================================================
%% brod_init https://github.com/klarna/brod
%% ===================================================================
brod_init(_Env) ->
    {ok, _} = application:ensure_all_started(brod),
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
    %%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
    %%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
    %%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
    ClientConfig = [],%% socket error recovery
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    Topic = proplists:get_value(kafka_topic, Kafka),
    Partition = 0,
%%下面兩行是初始化client瘦陈,一個(gè)client只能發(fā)送到一個(gè)topic,如果要多個(gè)topic波俄,則創(chuàng)建多個(gè)client
    ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
    ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
    %%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
    %%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
    io:format("Init brod with ~p~n", [KafkaBootstrapEndpoints]).
 
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.

④ 新增etc/emq_plugin_kafka.config

[
  {emq_plugin_kafka, [
        {kafka, [
      { bootstrap_broker, [{"192.168.220.129", 9092},{"192.168.220.130", 9092},{"192.168.220.131", 9092}] },
      { query_api_versions, false },
      { reconnect_cool_down_seconds, 10},
      {kafka_producer_partition, 3},
      {kafka_topic, <<"kafka-new-topic">>}
    ]}
  ]}
].
  1. 再修改一下emq的配置文件:vim relx.config
    添加:{ekaf, load},
四晨逝、大功告成
  1. 先刪除_rel文件夾
  2. make
  3. 控制臺(tái)啟動(dòng):./_rel/emqttd/bin/emqttd console
  4. 打開kafka控制臺(tái)查看日志,這里的“kafka-topic”就是emq_plugin_kafka.conf文件里的配置項(xiàng):
    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-topic
  5. 網(wǎng)頁測(cè)試:


    網(wǎng)頁測(cè)試
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末懦铺,一起剝皮案震驚了整個(gè)濱河市捉貌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖趁窃,帶你破解...
    沈念sama閱讀 212,029評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牧挣,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡醒陆,警方通過查閱死者的電腦和手機(jī)瀑构,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,395評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刨摩,“玉大人寺晌,你說我怎么就攤上這事≡枭玻” “怎么了呻征?”我有些...
    開封第一講書人閱讀 157,570評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)罢浇。 經(jīng)常有香客問我陆赋,道長(zhǎng),這世上最難降的妖魔是什么嚷闭? 我笑而不...
    開封第一講書人閱讀 56,535評(píng)論 1 284
  • 正文 為了忘掉前任奏甫,我火速辦了婚禮,結(jié)果婚禮上凌受,老公的妹妹穿的比我還像新娘阵子。我一直安慰自己,他們只是感情好胜蛉,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,650評(píng)論 6 386
  • 文/花漫 我一把揭開白布挠进。 她就那樣靜靜地躺著,像睡著了一般誊册。 火紅的嫁衣襯著肌膚如雪领突。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,850評(píng)論 1 290
  • 那天案怯,我揣著相機(jī)與錄音君旦,去河邊找鬼。 笑死嘲碱,一個(gè)胖子當(dāng)著我的面吹牛金砍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播麦锯,決...
    沈念sama閱讀 39,006評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼恕稠,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了扶欣?” 一聲冷哼從身側(cè)響起鹅巍,我...
    開封第一講書人閱讀 37,747評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤千扶,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后骆捧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體澎羞,經(jīng)...
    沈念sama閱讀 44,207評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,536評(píng)論 2 327
  • 正文 我和宋清朗相戀三年敛苇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了煤痕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,683評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡接谨,死狀恐怖摆碉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情脓豪,我是刑警寧澤巷帝,帶...
    沈念sama閱讀 34,342評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站扫夜,受9級(jí)特大地震影響楞泼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜笤闯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,964評(píng)論 3 315
  • 文/蒙蒙 一堕阔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧颗味,春花似錦超陆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,772評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至晶默,卻和暖如春谨娜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背磺陡。 一陣腳步聲響...
    開封第一講書人閱讀 32,004評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工趴梢, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人币他。 一個(gè)月前我還...
    沈念sama閱讀 46,401評(píng)論 2 360
  • 正文 我出身青樓坞靶,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親圆丹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子滩愁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,566評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容