设为首页 加入收藏

TOP

Storm集成Kafka和Redis
2019-01-21 02:33:35 】 浏览:44
Tags:Storm 集成 Kafka Redis
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Simon_09010817/article/details/79981584

Storm集成Kafka和Redis

一、新建Strom项目

二、引入pom依赖

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <spring.version>2.5.6</spring.version>
</properties>

<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
  </dependency>

  <!--storm -->
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.6</version>
    <!--<scope>provided</scope>-->
  </dependency>

  <!--redis -->
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>1.1.0</version>
  </dependency>

  <!--Kafka  start -->
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.2</version>
    <exclusions>
      <exclusion>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
      </exclusion>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>0.9.6</version>
  </dependency>
  <!--Kafka  end -->

  <!--ActiveMQ  start-->
 <!-- <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.7</version>
  </dependency>
  <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-jms</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.4.0</version>
    <exclusions>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
      </exclusion>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
      </exclusion>
    </exclusions>
  </dependency>-->
  <!--ActiveMQ    end-->
</dependencies>

<build>
  <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改-->
            <!--<mainClass>com.simon.storm.URLCountTopology</mainClass>-->
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>exec-maven-plugin</artifactId>
      <version>1.2.1</version>
      <executions>
        <execution>
          <goals>
            <goal>exec</goal>
          </goals>
        </execution>
      </executions>
      <configuration>
        <executable>java</executable>
        <includeProjectDependencies>true</includeProjectDependencies>
        <includePluginDependencies>true</includePluginDependencies>
        <mainClass></mainClass>
        <systemProperties>
          <systemProperty>
            <key>log4j.configuration</key>
            <value>file:./src/main/resources/log4j.properties</value>
          </systemProperty>
        </systemProperties>
      </configuration>
      <dependencies>
        <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>storm-core</artifactId>
          <version>1.1.0</version>
          <type>jar</type>
        </dependency>
      </dependencies>
    </plugin>
  </plugins>
</build>

三、定义Schema 将kafka获取的消息作过滤处理。

package com.simon.storm;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * Created by Administrator on 2018/1/10.
 * 对kafka出来的数据转换成字符串,
 * 接下来我们想办法来处理strom清洗之后的数据
 */
public class MessageScheme implements Scheme {
    public List<Object> deserialize(byte[] bytes) {
        try {
             String msg = new String(bytes, "UTF-8");
             return new Values(msg);
         } catch (UnsupportedEncodingException e) {
             e.printStackTrace();
         }
        return null;
    }

    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

四、定义bolt

将数据处理并保存到redis中

package com.simon.storm;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;

/**
 * Created by Administrator on 2018/1/10.
 *
 * 把输出的消息放到文件redis缓存中
 */
public class SenqueceBolt  extends BaseBasicBolt {

    private Jedis jedis ;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        jedis = new Jedis("master",6379);
    }

    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String word = (String) tuple.getValue(0);
        /*String out = "output:" + word;*/
        System.out.println(word);

        if(word != null){
            //将url写入到redis的一个zset集合中
            // 参数1: zset的主键 参数2:要增加的分数 参数3:url中的网站部分——site
            word = word.toUpperCase();
            jedis.zincrby("words", 1, word);
        }

        basicOutputCollector.emit(new Values(word));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("message"));
    }
}

五、定义Topology 提交本地或集群

package com.simon.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import java.util.HashMap;
import java.util.Map;
public class StormKafkaTopo {
    public static void main( String[] args ) {
        BrokerHosts brokerHosts = new ZkHosts("master:2181,slave01:2181,slave02:2181");
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "hello", "/kafka", "kafkaspout");
         Config conf = new Config();
         Map<String, String> map = new HashMap<String, String>();
         map.put("metadata.broker.list", "10.10.34.36:9092");
         map.put("serializer.class", "kafka.serializer.StringEncoder");
         conf.put("kafka.broker.properties", map);
         //conf.put("topic", "topic2");
         spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new KafkaSpout(spoutConfig));
         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
         builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
         if(args != null && args.length > 0) {
                //提交到集群运行
                 try {
                     StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                 } catch (AlreadyAliveException e) {
                     e.printStackTrace();
                 } catch (InvalidTopologyException e) {
                     e.printStackTrace();
                 }
             } else {
                 //本地模式运行
                 LocalCluster cluster = new LocalCluster();
                 cluster.submitTopology("Topotest1121", conf, builder.createTopology());
                 Utils.sleep(1000000);
                 cluster.killTopology("Topotest1121");
                 cluster.shutdown();
             }
     }
}

六、开启集群

1.开启zookeeper

1)先同步时间

2)开启zookeeper集群

2.开启storm

1)开启nimbus

2)开启ui

3)开启supervisor

3.开启kafka

1)开启kafka服务

2)另起窗口开启kafka主题

3)开启kafka发布者

4)开启kafka消费者

5)测试生产者发送消息

6)测试消费者接受消息

4.开启redis客户端

七、运行Topology程序

八、启动完成

九、发送生产者消息测试

十、控制台输出

十一、查看redis

查看列表:

在此发送kafka消息:

十二、打包运行storm集群

放开storm-core的provide

十三、上传jar包

十四、运行storm jar

十五、由于版本冲突会造成很多难以解决的问题

十六、重新安装匹配的kafka和Storm的版本

apache-storm-0.9.6.tar.gzkafka_2.9.2-0.8.2.2.tgz

十七、打包到集群

十八、上传

十九、运行storm jar

二十、测试kafka发送消息

二十一、查看redis缓存消息

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka 和 EMS 消息批量 ack 的实现 下一篇protobuf-gRPC-kafka-thrift-hbas..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目