设为首页 加入收藏

TOP

替代Flume——Kafka Connect简介(二)
2019-09-17 15:23:58 】 浏览:44
Tags:替代 Flume Kafka Connect 简介
uot;foo" "bar" "hello world"

转换后:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

常用转换类型:

  • InsertField - 使用静态数据或记录元数据添加字段
  • ReplaceField - 过滤或重命名字段
  • MaskField - 用类型的有效空值替换字段(0,空字符串等)
  • ValueToKey Value转换为Key
  • HoistField - 将整个事件作为单个字段包装在Struct或Map中
  • ExtractField - 从Struct和Map中提取特定字段,并在结果中仅包含此字段
  • SetSchemaMetadata - 修改架构名称或版本
  • TimestampRouter - 根据原始主题和时间戳修改记录主题
  • RegexRouter - 根据原始主题,替换字符串和正则表达式修改记录主题

集群模式

集群模式下,可以扩展,容错。

启动:
> bin/connect-distributed.sh config/connect-distributed.properties

在集群模式下,Kafka Connect在Kafka主题中存储偏移量,配置和任务状态。

集群模式配置

connect-distributed.properties

#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#还有一些配置要注意
#group.id(默认connect-cluster) - Connect的组id 请注意,这不得与使用者的组id 冲突
group.id=connect-cluster

#用于存储偏移的主题; 此主题应具有许多分区
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

#用于存储连接器和任务配置的主题  只能一个分区
config.storage.topic=connect-configs
config.storage.replication.factor=1

#用于存储状态的主题; 此主题可以有多个分区
status.storage.topic=connect-status
status.storage.replication.factor=1

在集群模式下,配置并不会在命令行传进去,而是需要REST API来创建,修改和销毁连接器。

集群模式连接器配置(REST API)

可以配置REST API服务器,支持http与https

listeners=http://localhost:8080,https://localhost:8443

默认情况下,如果未listeners指定,则REST服务器使用HTTP协议在端口8083上运行。

以下是当前支持的REST API:

  • GET /connectors - 返回活动连接器列表
  • POST /connectors - 创建一个新的连接器; 请求主体应该是包含字符串name字段的JSON对象和包含config连接器配置参数的对象字段
  • GET /connectors/{name} - 获取有关特定连接器的信息
  • GET /connectors/{name}/config - 获取特定连接器的配置参数
  • PUT /connectors/{name}/config - 更新特定连接器的配置参数
  • GET /connectors/{name}/status - 获取连接器的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,错误信息(如果失败)以及所有任务的状态
  • GET /connectors/{name}/tasks - 获取当前为连接器运行的任务列表
  • GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,以及错误信息是否失败
  • PUT /connectors/{name}/pause - 暂停连接器及其任务,这将停止消息处理,直到恢复连接器
  • PUT /connectors/{name}/resume - 恢复暂停的连接器(如果连接器未暂停,则不执行任何操作)
  • POST /connectors/{name}/restart - 重新启动连接器(通常是因为它已经失败)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重启个别任务(通常因为失败)
  • DELETE /connectors/{name} - 删除连接器,暂停所有任务并删除其配置

连接器开发指南

kakfa允许开发人员自己去开发一个连接器。

核心概念

要在Kafka和其他系统之间复制数据,用户需要创建一个Connector

Connector有两种形式:

SourceConnectors从另一个系统导入数据,例如,JDBCSourceConnector将关系数据库导入Kafka

SinkConnectors导出数据,例如,HDFSSinkConnector将Kafka主题的内容导出到HDFS文件

和对应的Task:

SourceTaskSinkTask

Task形成输入输出流,开发Task要注意偏移量的问题。

每个流应该是一系列键值记录。还需要定期提交已处理的数据的偏移量,以便在发生故障时,处理可以从上次提交的偏移量恢复。Connector还需要是动态的,实现还负责监视外部系统是否存在任何更改。

开发一个简单的连接器

开发连接器只需要实现两个接口,即ConnectorTask

这里我们简单开发一个FileStreamConnector。

此连接器是为在独立模式下使用,SourceConnectorSourceTask读取文件的每一行,SinkConnectorSinkTask每个记录写入一个文件。

连接器示例:

继承SourceConnector,添加字段(要读取的文件名和要将数据发送到的主题)

public class FileStreamSourceConnector extends SourceConnector {
    priva
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇MySQL创建用户和加限权 下一篇Sqoop 的基本使用

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目