uot;foo"
"bar"
"hello world"
转换后:
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}
常用转换类型:
- InsertField - 使用静态数据或记录元数据添加字段
- ReplaceField - 过滤或重命名字段
- MaskField - 用类型的有效空值替换字段(0,空字符串等)
- ValueToKey Value转换为Key
- HoistField - 将整个事件作为单个字段包装在Struct或Map中
- ExtractField - 从Struct和Map中提取特定字段,并在结果中仅包含此字段
- SetSchemaMetadata - 修改架构名称或版本
- TimestampRouter - 根据原始主题和时间戳修改记录主题
- RegexRouter - 根据原始主题,替换字符串和正则表达式修改记录主题
集群模式
集群模式下,可以扩展,容错。
启动:
> bin/connect-distributed.sh config/connect-distributed.properties
在集群模式下,Kafka Connect在Kafka主题中存储偏移量,配置和任务状态。
集群模式配置
connect-distributed.properties
#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#还有一些配置要注意
#group.id(默认connect-cluster) - Connect的组id 请注意,这不得与使用者的组id 冲突
group.id=connect-cluster
#用于存储偏移的主题; 此主题应具有许多分区
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#用于存储连接器和任务配置的主题 只能一个分区
config.storage.topic=connect-configs
config.storage.replication.factor=1
#用于存储状态的主题; 此主题可以有多个分区
status.storage.topic=connect-status
status.storage.replication.factor=1
在集群模式下,配置并不会在命令行传进去,而是需要REST API来创建,修改和销毁连接器。
集群模式连接器配置(REST API)
可以配置REST API服务器,支持http与https
listeners=http://localhost:8080,https://localhost:8443
默认情况下,如果未listeners
指定,则REST服务器使用HTTP协议在端口8083上运行。
以下是当前支持的REST API:
GET /connectors
- 返回活动连接器列表
POST /connectors
- 创建一个新的连接器; 请求主体应该是包含字符串name
字段的JSON对象和包含config
连接器配置参数的对象字段
GET /connectors/{name}
- 获取有关特定连接器的信息
GET /connectors/{name}/config
- 获取特定连接器的配置参数
PUT /connectors/{name}/config
- 更新特定连接器的配置参数
GET /connectors/{name}/status
- 获取连接器的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,错误信息(如果失败)以及所有任务的状态
GET /connectors/{name}/tasks
- 获取当前为连接器运行的任务列表
GET /connectors/{name}/tasks/{taskid}/status
- 获取任务的当前状态,包括它是否正在运行,失败,暂停等,分配给哪个工作人员,以及错误信息是否失败
PUT /connectors/{name}/pause
- 暂停连接器及其任务,这将停止消息处理,直到恢复连接器
PUT /connectors/{name}/resume
- 恢复暂停的连接器(如果连接器未暂停,则不执行任何操作)
POST /connectors/{name}/restart
- 重新启动连接器(通常是因为它已经失败)
POST /connectors/{name}/tasks/{taskId}/restart
- 重启个别任务(通常因为失败)
DELETE /connectors/{name}
- 删除连接器,暂停所有任务并删除其配置
连接器开发指南
kakfa允许开发人员自己去开发一个连接器。
核心概念
要在Kafka和其他系统之间复制数据,用户需要创建一个Connector
Connector有两种形式:
SourceConnectors
从另一个系统导入数据,例如,JDBCSourceConnector
将关系数据库导入Kafka
SinkConnectors
导出数据,例如,HDFSSinkConnector
将Kafka主题的内容导出到HDFS文件
和对应的Task:
SourceTask
和SinkTask
Task形成输入输出流,开发Task要注意偏移量的问题。
每个流应该是一系列键值记录。还需要定期提交已处理的数据的偏移量,以便在发生故障时,处理可以从上次提交的偏移量恢复。Connector还需要是动态的,实现还负责监视外部系统是否存在任何更改。
开发一个简单的连接器
开发连接器只需要实现两个接口,即Connector
和Task
。
这里我们简单开发一个FileStreamConnector。
此连接器是为在独立模式下使用,SourceConnector/
SourceTask读取文件的每一行,
SinkConnector/
SinkTask每个记录写入一个文件。
连接器示例:
继承SourceConnector,添加字段(要读取的文件名和要将数据发送到的主题)
public class FileStreamSourceConnector extends SourceConnector {
priva