设为首页 加入收藏

TOP

flume+kafka+storm整合(一)
2018-11-28 17:51:52 】 浏览:25
Tags:flume kafka storm 整合
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chenguangchun1993/article/details/79474350

这几天弄了一下消息采集方面的工作,跑了一下flume+kafka+storm的流程,遇到一些问题,不过最终还是搞定了。其实网上有很多相关的文章,这里整理出来,只是作一下笔记,方便以后查看,如果能帮到和我踩到类似坑的小伙伴的话,那就更好了,废话补多说,开工。

这里就不具体介绍flume,kafka,storm的相关概念了,有需求的可以直接去官网上查看,是在不行,也可以去看看源码。把官网链接甩出来:
flume: http://flume.apache.org/FlumeUserGuide.html
kafka:http://kafka.apache.org/
storm:http://storm.apache.org/releases/2.0.0-SNAPSHOT/index.html

第一步:配置flume

直接上配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
 a1.sources.r1.type = http
 a1.sources.r1.bind = 192.168.1.22
 a1.sources.r1.port = 50000
 a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
 # Describe the sink
 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
 a1.sinks.k1.kafka.topic = base64
 a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.22:6667,192.168.1.23:6667
 a1.sinks.k1.kafka.flumeBatchSize = 20
 a1.sinks.k1.kafka.producer.acks = 1
 a1.sinks.k1.kafka.producer.linger.ms = 1
 a1.sinks.k1.kafka.producer.compression.type = snappy
 a1.sinks.k1.useFlumeEventFormat=false
# Use a channel which buffers events in memory
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 10000
 a1.channels.c1.transactionCapacity = 1000

 # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

我采用的是httpsource的方式向flume发送数据,source的配置基本上按照官网来就行,需要注意的是kafkasink,由于我的集群时ambari,所以.kafka.bootstrap.servers配置里的端口是6667,如果是原生flume,这里配置9092即可,useFlumeEventFormat默认值为false,表示只将Event的body传送到kafka中,如果为true,则将整个Event(包括header和body)一起发送给kafka,下一节再来测试为true的情况。

启动flume:

bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console

2018-03-07 16:53:27,201 (lifecycleSupervisor-1-4) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26.hwx
2018-03-07 16:53:27,263 (lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)] Kafka version : 0.10.1.2.6.4.0-91
2018-03-07 16:53:27,264 (lifecycleSupervisor-1-0) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)] Kafka commitId : ba0af6800a08d2f8
2018-03-07 16:53:27,266 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2018-03-07 16:53:27,267 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2018-03-07 16:53:27,380 (lifecycleSupervisor-1-4) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@10.0.13.73:50000
2018-03-07 16:53:27,382 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2018-03-07 16:53:27,382 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started

flume启动好以后,可以暂时不管他,开始处理kafka。

第二步:创建topic

flume把数据传送到kafka中的某个topic中,根据配置,我们需要提前创建好topic。
命令:

bin/kafka-topics.sh --create --zookeeper 192.168.1.22:2181,192.168.1.23:2181,192.168.1.24:2181 --replication-factor 1 --partitions 3 --topic base64

# 查看一下

bin/kafka-topics.sh --list --zookeeper 192.168.1.22:2181,192.168.1.23:2181,192.168.1.24:2181

ambari_kafka_service_check
base64
storm
test

topic创建成功,OK,暂时不用管kafka了。

第三步:storm消费kafka数据

这个没什么多说的的,storm的框架结构不是这篇文章的重点,直接上代码。

KafkaTopology.java

package com.cgc.kafka;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
/**
 * Created by chenguangchun on 2018/2/28
 */

/**
 * Kafka 0.10.1
 *
 */
public class KafkaTopology
{
    private static final String KAFKA_SPOUT_ID = "kafka-stream";
    private static final String PARSE_BOLT_ID = "parse-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {
        String zkHosts = "192.168.1.22:2181,192.168.1.23:2181,192.168.1.24:2181";
        String topicName="base64";
        String zkRoot = "";
        String zkSpoutId ="mytopic"; // 读取的offset会被存储在/zkRoot/id下面,所以id类似consumer group

        BrokerHosts hosts = new ZkHosts(zkHosts);
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, zkSpoutId);
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        List<String> servers = new ArrayList<String>();
        servers.add("192.168.1.22");
        servers.add("192.168.1.23");
        servers.add("192.168.1.24");
        spoutConfig.zkServers = servers;//记录Spout读取进度所用的zookeeper的host(必须设置,否则无法记录进度)
        spoutConfig.zkPort = 2181;//记录进度用的zookeeper的端口(必须设置,否则无法记录进度)
        KafkaSpout spout = new KafkaSpout(spoutConfig);
        ParseBolt parseBolt = new ParseBolt();
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, spout);
        builder.setBolt(PARSE_BOLT_ID, parseBolt).shuffleGrouping(
                KAFKA_SPOUT_ID);

        Config config = new Config();
        config.setNumWorkers(1);
        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

        } else {
            StormSubmitter.submitTopology(args[0], config,
                    builder.createTopology());
        }
    }
}

ParseBolt .java

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
/**
 * Created by chenguangchun on 2018/2/28
 */
public class ParseBolt extends BaseRichBolt {
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;
    }

    public void execute(Tuple input) {
    // 这里只实现简单输出就好
        String word = input.getString(0);
        System.out.println(word);
        this.collector.ack(input);//告诉KafkaSpout已处理完成(必须应答Spout才记录读取进度)

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

启动程序。

第四步: 向flume发送数据

我们采用的是httpsource,可以选择使用curl(使用post方式,不然会失败)向端口发送数据,这里,我们用代码实现。

import com.google.gson.Gson;
import org.apache.tika.metadata.Metadata;
import org.apache.flume.event.JSONEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
/**
 * Created by chenguangchun on 2018/3/6
 */
public class ImageToJson {
    public static void main(String[] args) {
        JSONEvent jse = new JSONEvent();
        jse.setBody("this is a message, hahhahhahhahhahah".getBytes());
        Gson gson = new Gson();
        List<Object> events1 = new ArrayList<Object>();
        events1.add(jse);
        String jsonstr = gson.toJson(events1);
        post("http://192.168.1.22:50000", jsonstr);
    }

    public void post(String urlstr, String json){
        try{
            //创建连接
            URL url = new URL(urlstr);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            connection.setDoOutput(true);
            connection.setDoInput(true);
            connection.setRequestMethod("POST");
            connection.setUseCaches(false);
            connection.setInstanceFollowRedirects(true);
            connection.setRequestProperty("Content-Type",
                    "application/x-www-form-urlencoded");
            connection.connect();
            //POST请求
            DataOutputStream out = new DataOutputStream(
                    connection.getOutputStream());
            out.writeBytes(json);
            out.flush();
            out.close();
            //读取响应
            BufferedReader reader = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String lines;
            StringBuffer sb = new StringBuffer("");
            while ((lines = reader.readLine()) != null) {
                lines = new String(lines.getBytes(), "utf-8");
                sb.append(lines);
            }
            System.out.println(sb);
            reader.close();
            // 断开连接
            connection.disconnect();
        } catch (MalformedURLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

这里需要说明的是,flumeEvent有自己独特的json方式,用fastjson等方式生成的数据是会报错的,必须用JSONEvent才行。

启动程序。

可以在storm程序中看到结果:

大功告成!

如果想要在传输数据的时候,同时带上header头文件信息(比如文件的元数据)该怎么做呢,下一篇文章揭晓。

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume-NG 日志文件收集 简单实例 下一篇Flume 1.5日志采集并存入mongodb..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目