JAR Build:
Build standard jar by the following command
$ ./gradlew build
Build fat jar which contains elasticsearch dependencies
$ ./gradlew assembly
Jar will be generated in build/libs
把jar包放到flume的lib下,然后
remove guava-.jar and jackson-core-.jar in flume’s default libs dir. They are outdated and newer version are included in Elasticsearch.
把ES目录lib下的jar都放到flume的lib下
然后就可以开始写配置文件了
新建一个flume的.properties文件
vim flume-conf-kafka2es.properties
# ------------------- 定义数据流----------------------# source的名字
flume2es_agent.sources = source_from_kafka
# channels的名字,建议按照type来命名
flume2es_agent.channels = mem_channel
# sink的名字,建议按照目标来命名
flume2es_agent.sinks = es_sink
#auto.commit.enable = true #-------- kafkaSource相关配置-----------------# 定义消息源类型# For each one of the sources, the type is defined
flume2es_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flume2es_agent.sources.source_from_kafka.channels = mem_channel
flume2es_agent.sources.source_from_kafka.batchSize = 5000# 定义kafka所在的地址
flume2es_agent.sources.source_from_kafka.kafka.bootstrap.servers = 192.168.2.xx:9092,192.168.2.xx:9092# 配置消费的kafka topic#flume2es_agent.sources.source_from_kafka.topic = itil_topic_4097
flume2es_agent.sources.source_from_kafka.kafka.topics = mtopic
# 配置消费的kafka groupid#flume2es_agent.sources.source_from_kafka.groupId = flume4097
flume2es_agent.sources.source_from_kafka.kafka.consumer.group.id = flumetest
#---------ES Sink 相关配置------------------# The channel can be defined as follows. #flume2es_agent.sinks.es_sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
flume2es_agent.sinks.es_sink.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink# 指定sink需要使用的channel的名字,注意这里是channel#Specify the channel the sink should use
flume2es_agent.sinks.es_sink.channel = mem_channel
#flume2es_agent.sinks.es_sink.filePrefix = %{host}
flume2es_agent.sinks.es_sink.hostNames = cdh1,cdh2,cdh3,cdh4,cdh5,cdh6
flume2es_agent.sinks.es_sink.clusterName = elasticsearch
flume2es_agent.sinks.es_sink.indexName = items
flume2es_agent.sinks.es_sink.indexTypeitems = item
flume2es_agent.sinks.es_sink.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
flume2es_agent.sinks.es_sink.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder#File size to trigger roll, in bytes (0: never roll based on file size)
flume2es_agent.sinks.es_sink.batchSize = 500#------- memoryChannel相关配置-------------------------# channel类型# Each channel's type is defined.
flume2es_agent.channels.mem_channel.type = memory
# Other config values specific to each type of channel(sink or source) # can be defined as well # channel存储的事件容量# In this case, it specifies the capacity of the memory channel
flume2es_agent.channels.mem_channel.capacity = 100000# 事务容量
flume2es_agent.channels.mem_channel.transactionCapacity = 10000