版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chenguangchun1993/article/details/79474350
这几天弄了一下消息采集方面的工作,跑了一下flume+kafka+storm的流程,遇到一些问题,不过最终还是搞定了。其实网上有很多相关的文章,这里整理出来,只是作一下笔记,方便以后查看,如果能帮到和我踩到类似坑的小伙伴的话,那就更好了,废话补多说,开工。
这里就不具体介绍flume,kafka,storm的相关概念了,有需求的可以直接去官网上查看,是在不行,也可以去看看源码。把官网链接甩出来:
flume: http://flume.apache.org/FlumeUserGuide.html
kafka:http://kafka.apache.org/
storm:http://storm.apache.org/releases/2.0.0-SNAPSHOT/index.html
第一步:配置flume
直接上配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http
a1.sources.r1.bind = 192.168.1.22
a1.sources.r1.port = 50000
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = base64
a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.22:6667,192.168.1.23:6667
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sinks.k1.useFlumeEventFormat=false
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
我采用的是httpsource的方式向flume发送数据,source的配置基本上按照官网来就行,需要注意的是kafkasink,由于我的集群时ambari,所以.kafka.bootstrap.servers配置里的端口是6667,如果是原生flume,这里配置9092即可,useFlumeEventFormat默认值为false,表示只将Event的body传送到kafka中,如果为true,则将整个Event(包括header和body)一起发送给kafka,下一节再来测试为true的情况。
启动flume:
bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console
2018-03-07 16:53:27,201 (lifecycleSupervisor-1-4) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26.hwx
2018-03-07 16:53:27,263 (lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)] Kafka version : 0.10.1.2.6.4.0-91
2018-03-07 16:53:27,264 (lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)] Kafka commitId : ba0af6800a08d2f8
2018-03-07 16:53:27,266 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2018-03-07 16:53:27,267 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2018-03-07 16:53:27,380 (lifecycleSupervisor-1-4) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@10.0.13.73:50000
2018-03-07 16:53:27,382 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2018-03-07 16:53:27,382 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
flume启动好以后,可以暂时不管他,开始处理kafka。
第二步:创建topic
flume把数据传送到kafka中的某个topic中,根据配置,我们需要提前创建好topic。
命令:
-. -- -- ...,...,... --- -- --
-. -- -- ...,...,...
topic创建成功,OK,暂时不用管kafka了。
第三步:storm消费kafka数据
这个没什么多说的的,storm的框架结构不是这篇文章的重点,直接上代码。
KafkaTopology.java
package com.cgc.kafka;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
/**
* Created by chenguangchun on 2018/2/28
*/
/**
* Kafka 0.10.1
*
*/
public class KafkaTopology
{
private static final String KAFKA_SPOUT_ID = "kafka-stream";
private static final String PARSE_BOLT_ID = "parse-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
String zkHosts = "192.168.1.22:2181,192.168.1.23:2181,192.168.1.24:2181";
String topicName="base64";
String zkRoot = "";
String zkSpoutId ="mytopic";
BrokerHosts hosts = new ZkHosts(zkHosts);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, zkSpoutId);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
List<String> servers = new ArrayList<String>();
servers.add("192.168.1.22");
servers.add("192.168.1.23");
servers.add("192.168.1.24");
spoutConfig.zkServers = servers;
spoutConfig.zkPort = 2181;
KafkaSpout spout = new KafkaSpout(spoutConfig);
ParseBolt parseBolt = new ParseBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT_ID, spout);
builder.setBolt(PARSE_BOLT_ID, parseBolt).shuffleGrouping(
KAFKA_SPOUT_ID);
Config config = new Config();
config.setNumWorkers(1);
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
} else {
StormSubmitter.submitTopology(args[0], config,
builder.createTopology());
}
}
}
ParseBolt .java
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
/**
* Created by chenguangchun on 2018/2/28
*/
public class ParseBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
String word = input.getString(0);
System.out.println(word);
this.collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
启动程序。
第四步: 向flume发送数据
我们采用的是httpsource,可以选择使用curl(使用post方式,不然会失败)向端口发送数据,这里,我们用代码实现。
import com.google.gson.Gson;
import org.apache.tika.metadata.Metadata;
import org.apache.flume.event.JSONEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
/**
* Created by chenguangchun on 2018/3/6
*/
public class ImageToJson {
public static void main(String[] args) {
JSONEvent jse = new JSONEvent();
jse.setBody("this is a message, hahhahhahhahhahah".getBytes());
Gson gson = new Gson();
List<Object> events1 = new ArrayList<Object>();
events1.add(jse);
String jsonstr = gson.toJson(events1);
post("http://192.168.1.22:50000", jsonstr);
}
public void post(String urlstr, String json){
try{
URL url = new URL(urlstr);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setRequestMethod("POST");
connection.setUseCaches(false);
connection.setInstanceFollowRedirects(true);
connection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
connection.connect();
DataOutputStream out = new DataOutputStream(
connection.getOutputStream());
out.writeBytes(json);
out.flush();
out.close();
BufferedReader reader = new BufferedReader(new InputStreamReader(
connection.getInputStream()));
String lines;
StringBuffer sb = new StringBuffer("");
while ((lines = reader.readLine()) != null) {
lines = new String(lines.getBytes(), "utf-8");
sb.append(lines);
}
System.out.println(sb);
reader.close();
connection.disconnect();
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
这里需要说明的是,flumeEvent有自己独特的json方式,用fastjson等方式生成的数据是会报错的,必须用JSONEvent才行。
启动程序。
可以在storm程序中看到结果:
大功告成!
如果想要在传输数据的时候,同时带上header头文件信息(比如文件的元数据)该怎么做呢,下一篇文章揭晓。