版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Simon_09010817/article/details/79981584
Storm集成Kafka和Redis
一、新建Strom项目
二、引入pom依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>2.5.6</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!--storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<!--<scope>provided</scope>-->
</dependency>
<!--redis -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.1.0</version>
</dependency>
<!--Kafka start -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.6</version>
</dependency>
<!--Kafka end -->
<!--ActiveMQ start-->
<!-- <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jms</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.4.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<!--ActiveMQ end-->
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改-->
<!--<mainClass>com.simon.storm.URLCountTopology</mainClass>-->
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>true</includePluginDependencies>
<mainClass></mainClass>
<systemProperties>
<systemProperty>
<key>log4j.configuration</key>
<value>file:./src/main/resources/log4j.properties</value>
</systemProperty>
</systemProperties>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<type>jar</type>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
三、定义Schema 将kafka获取的消息作过滤处理。
package com.simon.storm;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* Created by Administrator on 2018/1/10.
* 对kafka出来的数据转换成字符串,
* 接下来我们想办法来处理strom清洗之后的数据
*/
public class MessageScheme implements Scheme {
public List<Object> deserialize(byte[] bytes) {
try {
String msg = new String(bytes, "UTF-8");
return new Values(msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
public Fields getOutputFields() {
return new Fields("msg");
}
}
四、定义bolt
将数据处理并保存到redis中
package com.simon.storm;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
/**
* Created by Administrator on 2018/1/10.
*
* 把输出的消息放到文件redis缓存中
*/
public class SenqueceBolt extends BaseBasicBolt {
private Jedis jedis ;
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
jedis = new Jedis("master",6379);
}
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = (String) tuple.getValue(0);
/*String out = "output:" + word;*/
System.out.println(word);
if(word != null){
//将url写入到redis的一个zset集合中
// 参数1: zset的主键 参数2:要增加的分数 参数3:url中的网站部分——site
word = word.toUpperCase();
jedis.zincrby("words", 1, word);
}
basicOutputCollector.emit(new Values(word));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("message"));
}
}
五、定义Topology 提交本地或集群
package com.simon.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import java.util.HashMap;
import java.util.Map;
public class StormKafkaTopo {
public static void main( String[] args ) {
BrokerHosts brokerHosts = new ZkHosts("master:2181,slave01:2181,slave02:2181");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "hello", "/kafka", "kafkaspout");
Config conf = new Config();
Map<String, String> map = new HashMap<String, String>();
map.put("metadata.broker.list", "10.10.34.36:9092");
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", map);
//conf.put("topic", "topic2");
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
if(args != null && args.length > 0) {
//提交到集群运行
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Topotest1121", conf, builder.createTopology());
Utils.sleep(1000000);
cluster.killTopology("Topotest1121");
cluster.shutdown();
}
}
}
六、开启集群
1.开启zookeeper
1)先同步时间
2)开启zookeeper集群
2.开启storm
1)开启nimbus
2)开启ui
3)开启supervisor
3.开启kafka
1)开启kafka服务
2)另起窗口开启kafka主题
3)开启kafka发布者
4)开启kafka消费者
5)测试生产者发送消息
6)测试消费者接受消息
4.开启redis客户端
七、运行Topology程序
八、启动完成
九、发送生产者消息测试
十、控制台输出
十一、查看redis
查看列表:
在此发送kafka消息:
十二、打包运行storm集群
放开storm-core的provide
十三、上传jar包
十四、运行storm jar
十五、由于版本冲突会造成很多难以解决的问题
十六、重新安装匹配的kafka和Storm的版本
apache-storm-0.9.6.tar.gzkafka_2.9.2-0.8.2.2.tgz
十七、打包到集群
十八、上传
十九、运行storm jar
二十、测试kafka发送消息
二十一、查看redis缓存消息