设为首页 加入收藏

TOP

Kafka--04Kafka connect
2019-02-10 02:30:07 】 浏览:19
Tags:Kafka--04Kafka connect

Kafka Connect用于在Kafka与其他系统间数据传输的工具。Kafka connect可以获取整个数据库或从所有应用程序服务器收集指标到Kafka;也可以从Kafka中topic数据传输至其他存储或者查询系统或者批处理系统进行离线分析。

Kafka Connect功能
Kafka能用框架,提供统一的集成API
支持分布式模式(distributed)及单机模式(standalone)
REST接口,用于查看和管理Kafka connectors
自动化offset管理,开发人员不必担心错误处理的影响
分布式,可扩展
流/批处理集成

Kafka connect两个核心组成 Source和Sink。
Source:负责导入数据到Kafka;
Sink :负责从Kafka导出数据;
(如上二者都称为 connector)


11588306-7f9c59a27ca4e8c7.png

Kafka connect的几个重要的概念包括:connectors、tasks、workers和converters。
Connectors-通过管理任务来细条数据流的高级抽象;
Tasks- 数据写入kafka和数据从kafka读出的实现;
Workers-运行connectors和tasks的进程;
Converters- kafka connect和其他存储系统直接发送或者接受数据之间转换数据;

distribute模式启动:
需要先建三个broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic connect-configs
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 50 --topic connect-offsets
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic connect-status
启动
bin/connect-distributed.sh config/connect-distributed.properties

通过rest api管理connector

因为kafka connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083,你也可以在启动kafka connect之前在配置文件中添加rest.port配置。

GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

#standalone模式启动
 bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka顺序写实现原理   参.. 下一篇Error while fetching metadata w..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }