设为首页 加入收藏

TOP

Windows64环境下   使用Flume将Mysql增量数据同步到Kafka
2019-01-09 02:13:47 】 浏览:51
Tags:Windows64 环境   使用 Flume Mysql 增量 数据 同步 Kafka
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zfqiannian/article/details/79877317

一.软件准备

1.jdk1.7

2.flume1.6 下载地址https://download.csdn.net/download/zfqiannian/10338190

3.kafka2.9 下载地址https://download.csdn.net/download/zfqiannian/10338220

4.maven 下载地址

5.sbt 下载地址https://download.csdn.net/download/zfqiannian/10338264

6.git bash 下载地址https://download.csdn.net/download/zfqiannian/10338346

二.安装并启动Kafka

1.安装kafka

此步骤看文章,比较详细,相信不会有问题

2.按顺序启动kafka(windows cmd下)

2.1 zookeeper守护进程

zookeeper-server-start.bat ../../config/zookeeper.properties

2.2 kafka守护进程

kafka-server-start.bat ../../config/server.properties

2.3 创建kafka topic(mysqltest为topic名

kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1

--partitions 1 --topic mysqltest

2.4 发送消息

kafka-console-producer.bat --broker-list localhost:9092 --topic mysqltest

2.5 接受消息

kafka-console-consumer.bat --zookeeper localhost:2181 --topic mysqltest--from-beginning

2.6 测试kafka是否成功

在2.4步骤下的cmd中发送任意字符串看2.5步骤下的cmd能否接收到,能同步说明kafka消息队列安装没有问题

3.安装并启动flume

flume目前为止不支持db数据源同步到kafka,但有第三方的插件。

插件源码地址:https://github.com/keedio/flume-ng-sql-source

3.1插件编译

cmd下使用mvn compile以及mvn package命令编译并打包到target目录下(-Dmaven.test.skip=true 跳过test)


3.2 拷贝jar包

在flume根目录下新建一个文件夹libExt

将3.1打包的flume-ng-sql-source-1.5.1-SNAPSHOT.jar以及mysql驱动包mysql-connector-java-5.0.5-bin.jar拷贝到libExt下

3.3 编写.properties文件

在flume根目录conf目录下新建config.properties文件

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/test

# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = admin
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = D://mylab//flume//apache-flume-1.6.0-bin
a1.sources.src-1.status.file.name = sqlSource.status


# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,name from user_table where id > $@$ order by id asc

a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000


a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

##############################

a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mysqltest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

a1.sinks.k1.channel = ch-1

a1.sources.src-1.channels=ch-1

3.4 启动flume-ng

windows cmd下启动命令:

java.exe -Xmx100m -Dlog4j.configuration=file:///D:\mylab\flume\apache-flume-1.6.0-bin\conf\log4j.properties -cp "D:\mylab\flume\apache-flume-1.6.0-bin\lib\*;D:\mylab\flume\apache-flume-1.6.0-bin\libExt\flume-ng-sql-source-1.5.1-SNAPSHOT.jar;D:\mylab\flume\apache-flume-1.6.0-bin\\libExt\mysql-connector-java-5.0.5-bin.jar" org.apache.flume.node.Application -f D:\mylab\flume\apache-flume-1.6.0-bin\conf\config.properties -n a1

4.测试同步



成功!


PS:启动flume不成功,查看flume根目录下conf下log4j.properties下配置的目录下的日志。









】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume读取日志数据写入kafka &nbs.. 下一篇nginx ---->flume ----->ka..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目