TOP

EMQ集成Kafka插件编写过程 emq_plugin_kafka
2019-02-07 02:33:49 】 浏览:963
Tags:EMQ 集成 Kafka 插件 编写 过程 emq_plugin_kafka

前言

关于EMQ和erlang的资料感觉实在是太少了,为啥EMQ不搞个论坛啥的,这样就不用恶心的度娘了。。

然后,本文记录也只是说,怎么去改,怎么用,具体很多深层次的东西,我也暂时还没去深究,后续有时间,再一点点研究,其他有些可能与实际说得有出入,还请见谅,指出,让我好纠正。

(其实,到最后测试成功,我才发现,我这几天白搞了,完全可以用另外一种方式实现mqtt->kafka,不需要编写插件,这是题外话了,后面再说)

准备

在编写插件之前,首先得先保证自己有以下的知识点

  1. erlang语言的基本知识
  2. emq架构了解 -> 传送门
  3. emq源码编译 -> 传送门

插件结构

在emq源码编译后,在emq根目录(emq-relx),会出现一个deps文件夹,这个就是emq在编译过程中,通过git下载下来的,里面包含了各种插件的源码。

这里着重看下deps文件夹下的emq_plugin_template文件夹,这个就是官网提供的编写emq插件的模板,来看看里面的结构


其他那些什么中间文件,git附属文件,我就不说了,没用到。

主要在来看看src文件夹

因为我主要实现数据采集,所以这里就没多关注订阅发布权限验证和登录授权验证这一块,我觉得这一块,emq都提供了相应的插件去实现了,也就没有必要再重复造轮子了。所以我直接忽略了权限验证这块,冲着数据采集去。

插件创建

复制一份插件模板
cp -r emq_plugin_template emq_plugin_kafka

先修改src/emq_plugin_template_app.erl文件,把acl和auth的模块注册代码去掉,并加些打印语句。

-module(emq_plugin_template_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
    {ok, Sup} = emq_plugin_template_sup:start_link(),
    emq_plugin_template:load(application:get_all_env()),
    io:format("emq_plugin_kafka start.~n", []),
    {ok, Sup}.

stop(_State) ->
    emq_plugin_template:unload(),
    io:format("emq_plugin_kafka stop.~n", []).
(当然,居然acl和auth模块没用的,看着碍眼,就直接把文件删掉吧)

接下来干点体力活,把插件目录emq_plugin_kafka下的所有文件名和里面内容的“template”全部改成“kafka”,包括src目录下的所有文件,Makefile文件,etc下的配置文件等。

顺利的话,这个时候插件的雏形算是完成了,把插件挂载到emq上,测试启动下。

在emq的根目录的rebar.config文件里面{emq_plugin_template, load}下一行,加入该插件。

{emq_plugin_kafka, load}

还有再Makefile文件内的DEPS加上emq_plugin_kafka

DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
        emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
        emq_auth_mysql emq_auth_pgsql emq_auth_redis emq_auth_mongo \
        emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
        emq_lua_hook emq_auth_jwt emq_plugin_kafka

为了方便查看插件是否加载成功,我们让插件随emq启动就默认加载:

在data/loaded_plugins文件内加上插件

vi data/loaded_plugins
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka.

最后,在emq根目录下,make一下。

如果编译不成功,就继续看看那些文件或内容有没有改对吧。

控制台模式启动emq:

./_rel/emqttd/bin/emqttd console

启动后,看到我们写的打印语句,就证明插件加载成功了。


你也可以用浏览器打开控制台,ip:18083,手动来启用和停止插件



编写代码连接Kafka

这里我就不介绍kafka+zookeeper的搭建了,网上各位大神的博客很多,可以对照着搭建

相关zookeeper的集群搭建,可以看我另外一篇博客(^^)。

OK,假设你已经搭建好了kafka环境了,接下来让emq连接到kafka

用到的一个工具是ekaf:https://github.com/helpshift/ekaf

这个工具是用erlang语言写的,用于连接kafka的,关于这个工具,后面我会再讲两句。

整改配置文件

关于配置文件,简单说两句:可以看看官网关于配置文件的变更历史,后面会创建一个schema文件。

进入插件目录emq_plugin_kafka的配置文件目录 emq_plugin_kafka/etc

先把文件名改成后缀为conf的格式(划重点!!!!!= = 我被这一点坑了很久,Makefile关于配置文件的转换,是读取.conf文件的,然后这里是config一直未发现。当然,你可以用erlang原生的配置文件方式,就不用大费周章,但是这里还是遵循emq的统一作风。)

mv emq_plugin_kafka.config emq_plugin_kafka.conf

然后编辑内容,把内容改成如下(server地址写成你自己的):

emq.plugin.kafka.server = 192.168.52.130:9092
emq.plugin.kafka.topic = test-topic2

这里先不要着急去思考关于Kafka集群的操作,后面再讲。

然后新建schema文件 emq_plugin_kafka/priv/emq_plugin_kafka.schema,添加以下内容:

{mapping, "emq.plugin.kafka.server", "emq_plugin_kafka.server", [
 {default, {"127.0.0.1", 6379}},
 {datatype, [integer, ip, string]}
]}.


%% emq.msg.kafka.topic
{mapping, "emq.plugin.kafka.topic", "emq_plugin_kafka.server", [
 {default, "test"},
 {datatype, string},
 hidden
]}.


{
  translation,
  "emq_plugin_kafka.server",
  fun(Conf) ->
      {RHost, RPort} = case cuttlefish:conf_get("emq.plugin.kafka.server", Conf) of
                {Ip, Port} -> {Ip, Port};
                S     -> case string:tokens(S, ":") of
                         [Domain]    -> {Domain, 9092};
                         [Domain, Port] -> {Domain, list_to_integer(Port)}
                       end
              end,
      Topic = cuttlefish:conf_get("emq.plugin.kafka.topic", Conf),
      [
      {host, RHost},
      {port, RPort},
      {topic, Topic}
      ]
  end
}.

编辑插件的Makefile文件,增加ekaf依赖

PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.10

BUILD_DEPS = emqttd cuttlefish ekaf
dep_emqttd = git https://github.com/emqtt/emqttd master
dep_cuttlefish = git https://github.com/emqtt/cuttlefish
dep_ekaf = git https://github.com/helpshift/ekaf master

ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'

NO_AUTOPATCH = cuttlefish

COVER = true

include erlang.mk

app:: rebar.config

app.config::
        ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data

编写逻辑代码

编辑src/emq_plugin_kafka.erl文件,改为如下:

-module(emq_plugin_kafka).

-include_lib("emqttd/include/emqttd.hrl").

-define(APP, emq_plugin_kafka).

-export([load/1, unload/0]).

%% Hooks functions

-export([on_client_connected/3, on_client_disconnected/3]).

-export([on_client_subscribe/4, on_client_unsubscribe/4]).

-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

%% Called when the plugin application start
load(Env) ->
    ekaf_init(Env),
    emqttd:hook('client.connected', fun MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun MODULE:on_message_acked/4, [Env]).

on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    {ok, Client}.

on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    ok.

on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.
    
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.

on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).

on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.

on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.

on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).

%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};

on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    ekaf_send(Message, _Env),
    {ok, Message}.

on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.

on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.

%% Called when the plugin application stop
unload() ->
    emqttd:unhook('client.connected', fun MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun MODULE:on_message_acked/4).

ekaf_init(_Env) ->
    {ok, Kafka_Env} = application:get_env(APP, server),
    Host = proplists:get_value(host, Kafka_Env),
    Port = proplists:get_value(port, Kafka_Env),
    Broker = {Host, Port},
    %Broker = {"192.168.52.130", 9092},
    Topic = proplists:get_value(topic, Kafka_Env),
    %Topic = "test-topic",
    
    application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
    application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
    application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
    %%设置数据上报间隔,ekaf默认是数据达到1000条或者5秒,触发上报
    application:set_env(ekaf, ekaf_buffer_ttl, 100),
   
    {ok, _} = application:ensure_all_started(ekaf).
    %io:format("Init ekaf with ~p~n", [Broker]),
    %Json = mochijson2:encode([
    %    {type, <<"connected">>},
    %    {client_id, <<"test-client_id">>},
    %    {cluster_node, <<"node">>}
    %]),
    %io:format("send : ~w.~n",[ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Json))]).


ekaf_send(Message, _Env) ->
    From = Message#mqtt_message.from, 
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    ClientId = get_form_clientid(From),
    Username = get_form_username(From),
    io:format("message receive : ~n",[]),
    io:format("From : ~w~n",[From]),
    io:format("Topic : ~w~n",[Topic]),
    io:format("Payload : ~w~n",[Payload]),
    io:format("Qos : ~w~n",[Qos]),    
    io:format("Dup : ~w~n",[Dup]),
    io:format("Retain : ~w~n",[Retain]),
    io:format("ClientId : ~w~n",[ClientId]),
    io:format("Username : ~w~n",[Username]),
    Str = [
              {client_id, ClientId},
              {message, [
                            {username, Username},
                            {topic, Topic},
                            {payload, Payload},
                            {qos, Qos},
                            {dup, Dup},
                            {retain, Retain}
                        ]},
               {cluster_node, node()},
               {ts, emqttd_time:now_ms()}
           ],
    io:format("Str : ~w.~n", [Str]),
    Json = mochijson2:encode(Str),
    KafkaTopic = get_topic(),
    ekaf:produce_sync_batched(KafkaTopic, list_to_binary(Json)).

get_form_clientid({ClientId, Username}) -> ClientId;
get_form_clientid(From) -> From.
get_form_username({ClientId, Username}) -> Username;
get_form_username(From) -> From.

get_topic() -> 
    {ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
    Topic.

修改完成后,现在插件目录下,make一下,成功编译确保代码编写没问题。。

最后修改emq根目录下的配置文件,在relx.config文件下,添加ekaf依赖,就跟之前添加emq_plugin_kafka依赖一样。

OK,安全起见,把_rel文件夹删掉,最后make一下,理论上是会编译成功的。

编译成功之后运行_rel目录下的emqttd,一切顺利,就可以启动成功。

用emq控制台的websocket来连接emq和发送消息



我这里接收Kafka的消息,没有用到kafka的消费者客户端,是自己编写java代码,去接受数据的,如果你习惯用kafka的消费者客户端,订阅相应的topic,也是可以收到一样的的消息的。

OK,至此,demo版本完成,后续的就看自己项目需求发挥了。

最后说点

1、其实感觉没必要在emq下编写kafka插件,比如java,自己写一个程序去连接mqtt订阅相关消息,然后丢到kafka中,这样不是更加省时方便吗?

2、关于ekaf,我只是浅显调用了发送的代码,具体其他一些配置项和接口,可以看看ekaf的源代码ekaf.erl和ekaf_lib.erl,

关于ekaf怎么设置kafka的集群,作者给出的答案是,ekaf目前不支持ekaf设置集群地址,而是建立一个kafka集群的LB,ekaf连接LB的地址,LB自己去连接kafka集群。

插件代码下载地址:https://download.csdn.net/download/caijiapeng0102/10513808



EMQ集成Kafka插件编写过程 emq_plugin_kafka https://www.cppentry.com/bencandy.php?fid=120&id=207232

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇【机器学习】在生产环境使用Kafka.. 下一篇Kafka的Exactly Once和事务