设为首页 加入收藏

TOP

spark读取kafka数据(两种方式比较及flume配置文件)
2019-02-12 02:29:51 】 浏览:117
Tags:spark 读取 kafka 数据 方式 比较 flume 配置 文件

Kafka topicpartition设计

1、对于银行应用日志,一个系统建一个topic,每台主机对应一个partition,规则为,flume采集时,同一个应用,数据送到同一个topic,一个主机,送一个partition,这样做是为了同一个日志的数据在一个partition中,顺序不会乱。另,flume配置文件可以配置sinktopicpartition idxxx.kafka.topic = xxx;dafaultPartitionId = x)。

2flume送数据到kafkaflume配置文件配置文件

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 150000

a1.channels.c1.transactionCapacity = 1500

##自定义source,见前一篇博客(flume 1.7 TailDir source重复获取数据集不释放资源解决办法

a1.sources.r1.type = TaildirSourceSelf

a1.sources.r1.batchSize = 500

a1.sources.r1.skipToEnd = true

a1.sources.r1.ignorePathChange = true

a1.sources.r1.positionFile = test/logConf/xxx_taildir_position.json

a1.sources.r1.writePosInterval = 5000

a1.sources.r1.filegroups = f0

a1.sources.r1.filegroups.f0 = xxx/xxx/*.*

a1.sources.r1.file.header.enabled = true

a1.sources.r1.file.name.key = tag.default.filename

a1.sources.r1.dir.header.enabled = true

a1.sources.r1.dir.name.key = tag.default.dirname

a1.sources.r1.inode.header.enabled = false

a1.sources.r1.inode.key = tag.default.inode

# For Kafka Sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = mn01:9092

a1.sinks.k1.kafka.topic = test-topic

a1.sinks.k1.defaultPartitionId = 0

a1.sinks.k1.flumeBatchSize = 500

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.max.request.size = 5120000

3SparkStreamingkafka获取数据

Spark提供两种方法从kafka拿数据:

1)Receiver方式:

a、其实是通过zookeeper连接kafka队列;

bkafkatopicpartition数目与sparkrddpartition数目没有关系,增加topicpartition数,不会增加spark的处理并行度,仅仅是增加获取数据的Receiver;

c、Receiver方式,据都存储在Spark Executor的内存中,一旦spark停止运行(如机器崩溃),数据将无法恢复,只有则不开启WALwrite ahead log)机制,及预写日志的方式同步将数据写到分布式系统中,虽然可以恢复,但是效率低,每份数据需要复制两份;

2Director方式:

a、这是spark1.3引进的新方法,直接从kafkabroker分区中读数据,跳过zookeeper,也没有receiver

bkafkatopicpartitionspark rddpartition一一对应,也就是说增加kafkatopicpartition数目,就增加了spark的并行处理度;

c、该方式不会复制两份数据,因为kafka本身就有高可用,kafka会做数据备份,宕机后,可以利用kafka副本恢复;

补充:这种方式数据的offset存在sparkcheckpoint中,不然每次重启spark,就会从kafka的最新offset位置读数据,会丢数据;所以,spark需要设置checkpoint,在创建JavaStreamingContext时,建议使用

JavaStreamingContext.getOrCreate(sparkChkDir,newStreamContextFunction());//第一个参数:checkpoint路径;第二个参数,返回新的JavaStreamingContext

及如果checkpoint存在就从checkpoint得到sparkStreamingContext,不存在就创建sparkStreamingContext;

下面是我从spark获取数据的代码(java版):

public static JavaDStream<String> getJavaDStreamFromKafka(String _Brokers, String _Topics, String _Metadata,

JavaStreamingContext _JavaStreamingContext) {

//返回该“,”分隔的所有topic的数据

HashSet<String> _TopicsSet = new HashSet<String>(Arrays.asList(_Topics.split(",")));

Map<String, String> _KafkaParams = new HashMap<String, String>();

_KafkaParams.put(_Metadata, _Brokers);

_KafkaParams.put("group.id", "xxxx");

_KafkaParams.put("fetch.message.max.bytes", "5120000");

// Create direct kafka stream with brokers and topics

JavaDStream<String> _MessagesLines = KafkaUtils

.createDirectStream(_JavaStreamingContext, String.class, String.class, StringDecoder.class,

StringDecoder.class, _KafkaParams, _TopicsSet)

.map(new Function<Tuple2<String, String>, String>() {

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

return _MessagesLines;

}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇KAFKA--几个基本概念 下一篇Kafka 编写自己的producer、parti..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目