版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Mark__cao/article/details/78014203
写在前面
写博客时使用的版本 Confluent Platform v3.2.1。使用的话,大家看官方文档kafka-connect,下面有几个使用过程中遇到的问题:
我的kafka里的数据是avro格式的,应需求要从kafka导入mysql数据库和从HDFS导入到kafka。
standalone模式启动命令:
//后面可以接多个配置文件,执行多个任务
./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties ../etc/kafka-connect-jdbc/sink-quickstart-sqlite1.properties ../etc/kafka-connect-jdbc/quickstart-sqlite2.properties
1. 配置kafka-connect时,你可能想知道它支持kafka什么之间的连接,下面就是文件中connector.class
选项
配置connector.class
kafka.connect available connectors are:
org.apache.kafka.connect.tools.MockSinkConnector,
org.apache.kafka.connect.source.SourceConnector,
io.confluent.connect.jdbc.JdbcSinkConnector,
io.confluent.connect.hdfs.HdfsSinkConnector,
io.confluent.connect.elasticsearch.ElasticsearchSinkConnector,
io.confluent.connect.hdfs.tools.SchemaSourceConnector,
io.confluent.connect.s3.S3SinkConnector,
io.confluent.connect.jdbc.JdbcSourceConnector,
io.confluent.connect.storage.tools.SchemaSourceConnector,
org.apache.kafka.connect.tools.MockConnector,
org.apache.kafka.connect.sink.SinkConnector,
org.apache.kafka.connect.tools.VerifiableSourceConnector,
org.apache.kafka.connect.tools.SchemaSourceConnector,
org.apache.kafka.connect.file.FileStreamSinkConnector,
org.apache.kafka.connect.tools.MockSourceConnector,
org.apache.kafka.connect.file.FileStreamSourceConnector,
org.apache.kafka.connect.tools.VerifiableSinkConnector
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=kafka_test
# 配置JDBC链接
connection.url=jdbc:mysql://localhost:3306/testuser=root&password=root
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以kafkacol为主键更新
pk.mode = record_value
pk.fields = kafkacol
#表名为kafkatable
table.name.format=kafkatable
- 它不支持复杂逻辑的插入表操作。比如需要对kafka的数据做些逻辑处理再插入表。
- kafka中数据的schema要和表的字段对应,不多不少。
- 比如需要根据表里不是主键的字段更新,目前还没找到方法。
- 它支持监听文件夹/文件,但是一次性读取的数据量过大会内存溢出,可以看下代码,根据需求自己修改。
- topic名称由
topic.prefix
的值来决定。
- 任务中断再次重启会记录偏移量。
(未完待续……..)