设为首页 加入收藏

TOP

kafka-connect遇到的问题
2019-01-06 02:25:26 】 浏览:373
Tags:kafka-connect 遇到 问题
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Mark__cao/article/details/78014203

写在前面

写博客时使用的版本 Confluent Platform v3.2.1。使用的话,大家看官方文档kafka-connect,下面有几个使用过程中遇到的问题:

我的kafka里的数据是avro格式的,应需求要从kafka导入mysql数据库和从HDFS导入到kafka。

standalone模式启动命令:
//后面可以接多个配置文件,执行多个任务


./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties ../etc/kafka-connect-jdbc/sink-quickstart-sqlite1.properties ../etc/kafka-connect-jdbc/quickstart-sqlite2.properties

1. 配置kafka-connect时,你可能想知道它支持kafka什么之间的连接,下面就是文件中connector.class选项

配置connector.class

kafka.connect available connectors are: 

org.apache.kafka.connect.tools.MockSinkConnector, 
org.apache.kafka.connect.source.SourceConnector, 
io.confluent.connect.jdbc.JdbcSinkConnector,
io.confluent.connect.hdfs.HdfsSinkConnector, 
io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, 
io.confluent.connect.hdfs.tools.SchemaSourceConnector, 
io.confluent.connect.s3.S3SinkConnector, 
io.confluent.connect.jdbc.JdbcSourceConnector, 
io.confluent.connect.storage.tools.SchemaSourceConnector, 
org.apache.kafka.connect.tools.MockConnector, 
org.apache.kafka.connect.sink.SinkConnector, 
org.apache.kafka.connect.tools.VerifiableSourceConnector,
org.apache.kafka.connect.tools.SchemaSourceConnector, 
org.apache.kafka.connect.file.FileStreamSinkConnector,  
org.apache.kafka.connect.tools.MockSourceConnector, 
org.apache.kafka.connect.file.FileStreamSourceConnector, 
org.apache.kafka.connect.tools.VerifiableSinkConnector

2. 当mysql作为connector-sink的时候,通过设置table.name.format来实现自定义表名或者是加前缀后缀的功能,如果为空,则使用topic作为表名。

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

#kafka的topic名称
topics=kafka_test


# 配置JDBC链接
connection.url=jdbc:mysql://localhost:3306/testuser=root&password=root

# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false

# upsert model更新和插入
insert.mode=upsert

# 下面两个参数配置了以kafkacol为主键更新
pk.mode = record_value
pk.fields = kafkacol

#表名为kafkatable
table.name.format=kafkatable
  1. 它不支持复杂逻辑的插入表操作。比如需要对kafka的数据做些逻辑处理再插入表。
  2. kafka中数据的schema要和表的字段对应,不多不少。
  3. 比如需要根据表里不是主键的字段更新,目前还没找到方法。

3.这个版本还没有connector-source为hdfs的组件,在github中找到一个插件kafka-hdfs-source-connector,需要编译。

  1. 它支持监听文件夹/文件,但是一次性读取的数据量过大会内存溢出,可以看下代码,根据需求自己修改。
  2. topic名称由topic.prefix的值来决定。
  3. 任务中断再次重启会记录偏移量。

(未完待续……..)

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka java客户端调用问题 下一篇Linux下安装Kafka和PHP的相关扩展

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目