设为首页 加入收藏

TOP

14.5 storm从kafka接收数据然后写入kafka
2018-12-06 02:33:21 】 浏览:131
Tags:14.5 storm kafka 接收 数据 然后 写入
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011418530/article/details/82620436

package storm.starter.chenbo;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.task.ShellBolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import backtype.storm.spout.MultiScheme;

import backtype.storm.spout.SchemeAsMultiScheme;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.StringScheme;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

import storm.kafka.bolt.KafkaBolt;

import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;

import storm.kafka.bolt.selector.DefaultTopicSelector;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.UUID;

import java.util.Properties;

/**

* This topology demonstrates Storm's stream groupings and multilang

* capabilities.

*/

public class LogFilterTopology {

public static class FilterBolt extends BaseBasicBolt {

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String line = tuple.getString(0);

if (line.contains("ERROR")) {

System.err.println(line);

collector.emit(new Values(line));

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("message")); // 这个地方写message是给后面FieldNameBasedTupleToKafkaMapper来用

}

}

@SuppressWarnings({ "unchecked", "rawtypes" })

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

// config kafka spout 发送数据的topic

String topic = "mylog";

ZkHosts zkHosts = new ZkHosts("192.168.188.4:4180,192.168.188.5:4180,192.168.188.6:4180");

SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", // 偏移量offset的根目录

"MyTrack");// 对应一个应用

List<String> zkServers = new ArrayList<String>();

System.out.println(zkHosts.brokerZkStr);

for (String host : zkHosts.brokerZkStr.split(",")) {

zkServers.add(host.split(":")[0]);

}

spoutConfig.zkServers = zkServers;

spoutConfig.zkPort = 4180;

spoutConfig.forceFromStart = false; // 从头开始消费

spoutConfig.socketTimeoutMs = 60 * 1000;

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定义输出为String

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

// set kafka spout

builder.setSpout("kafka_spout", kafkaSpout, 3);

// set bolt

builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");

// set kafka bolt 接收数据的kafka的topic:mylog_ERROR

KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("mylog_ERROR"))

.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

Config conf = new Config();

// set producer properties.

Properties props = new Properties();

props.put("metadata.broker.list", "192.168.188.4:9092,192.168.188.5:9092,192.168.188.6:9092");

props.put("request.required.acks", "1"); // 0 1 -1

props.put("serializer.class", "kafka.serializer.StringEncoder");

conf.put("kafka.broker.properties", props);

conf.setNumWorkers(4);

// StormSubmitter.submitTopologyWithProgressBar("logfilter", conf,

// builder.createTopology());

LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("logfilter", conf, builder.createTopology());

}

}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka遇到的坑-- Error while fet.. 下一篇spark消费kafka的两种方式

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目