我们知道过去对于Kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最常见的用法。但是Kafka不止于此,打开最新的官网。
我们看到Kafka最新的定义是:Apache Kafka® is a distributed streaming platform
分布式流处理平台。
这里也清晰的描述了Kafka的特点:Kafka用于构建实时数据管道和流式应用程序。它具有水平可扩展性、容错性、速度极快,并在数千家公司投入生产。
所以现在的Kafka已经不仅是一个分布式的消息队列,更是一个流处理平台。这源于它于0.9.0.0和0.10.0.0引入的两个全新的组件Kafka Connect与Kafka Streaming。
Kafka Connect简介
我们知道消息队列必须存在上下游的系统,对消息进行搬入搬出。比如经典的日志分析系统,通过flume读取日志写入kafka,下游由storm进行实时的数据处理。
Kafka Connect的作用就是替代Flume,让数据传输这部分工作可以由Kafka Connect来完成。Kafka Connect是一个用于在Apache Kafka和其他系统之间可靠且可靠地传输数据的工具。它可以快速地将大量数据集合移入和移出Kafka。
Kafka Connect的导入作业可以将数据库或从应用程序服务器收集的数据传入到Kafka,导出作业可以将Kafka中的数据传递到查询系统,也可以传输到批处理系统以进行离线分析。
Kafka Connect功能包括:
- 一个通用的Kafka连接的框架 - Kafka Connect规范化了其他数据系统与Kafka的集成,简化了连接器开发,部署和管理
- 分布式和独立模式 - 支持大型分布式的管理服务,也支持小型生产环境的部署
- REST界面 - 通过易用的REST API提交和管理Kafka Connect
- 自动偏移管理 - 只需从连接器获取一些信息,Kafka Connect就可以自动管理偏移量提交过程,因此连接器开发人员无需担心连接器开发中偏移量提交这部分的开发
- 默认情况下是分布式和可扩展的 - Kafka Connect构建在现有的组管理协议之上。可以添加扩展集群
- 流媒体/批处理集成 - 利用Kafka现有的功能,Kafka Connect是桥接流媒体和批处理数据系统的理想解决方案
运行Kafka Connect
Kafka Connect目前支持两种运行模式:独立和集群。
独立模式
在独立模式下,只有一个进程,这种更容易设置和使用。但是没有容错功能。
启动:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
独立模式配置
第一个参数config/connect-standalone.properties是一些基本的配置:
这几个在独立和集群模式下都需要设置:
#bootstrap.servers kafka集群列表
bootstrap.servers=localhost:9092
#key.converter key的序列化转换器 比如json的 key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter value的序列化转换器
value.converter=org.apache.kafka.connect.json.JsonConverter
#独立模式特有的配置:
#offset.storage.file.filename 用于存储偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
独立模式连接器配置(配置文件)
后面的参数connector1.properties [connector2.properties ...] 可以多个,是连接器配置内容
这里我们配置一个从文件读取数据并存入kafka的配置:
connect-file-sink.properties
name
- 连接器的唯一名称。尝试再次使用相同名称注册将失败。connector.class
- 连接器的Java类 此连接器的类的全名或别名。这里我们选择FileStreamSinktasks.max
- 应为此连接器创建的最大任务数。如果连接器无法达到此级别的并行性,则可能会创建更少的任务。key.converter
- (可选)覆盖worker设置的默认密钥转换器。value.converter
- (可选)覆盖worker设置的默认值转换器。下面两个必须设置一个:
topics
- 以逗号分隔的主题列表,用作此连接器的输入topics.regex
- 用作此连接器输入的主题的Java正则表达式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
可以在连接器中配置转换器
需要指定参数:
transforms
- 转换的别名列表,指定将应用转换的顺序。transforms.$alias.type
- 转换的完全限定类名。transforms.$alias.$transformationSpecificConfig
转换的配置属性
例如,我们把刚才的文件转换器的内容添加字段
首先设置connect-standalone.properties
key.converter.schemas.enable = false
value.converter.schemas.enable = false
设置connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source
没有转换前的结果:
&q