设为首页 加入收藏

TOP

storm(07)——storm与hdfs的整合
2019-04-22 00:18:24 】 浏览:50
Tags:storm hdfs 整合

导入依赖

主要引入hadoop相关依赖,以及storm-hdfs整合的依赖,

<dependencies>
 	<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.5</version>
  </dependency> 
 <!--  use new kafka spout code -->
	  <dependency>
	    <groupId>org.apache.storm</groupId>
	    <artifactId>storm-core</artifactId>
	    <version>1.1.1</version>
	    <scope>provided</scope> 
	</dependency>
	
	 <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>0.10.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.storm</groupId>
           <artifactId>storm-kafka-client</artifactId>
           <version>1.1.1</version>
       </dependency>
	<dependency>
	    <groupId>com.alibaba</groupId>
	    <artifactId>fastjson</artifactId>
	    <version>1.2.41</version>
	</dependency>
	 <dependency>
	    <groupId>redis.clients</groupId>
	    <artifactId>jedis</artifactId>
	    <version>2.9.0</version>
	</dependency>
 <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
<dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-hdfs</artifactId>
   <version>1.1.1</version>
   <exclusions>
   	<exclusion>
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-client</artifactId>
   	</exclusion>
   	<exclusion>
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-auth</artifactId>
   	</exclusion>	
   	<exclusion>
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-common</artifactId>
   	</exclusion>
   	<exclusion>
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-hdfs</artifactId>
   	</exclusion>
   </exclusions>
</dependency> 

<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-jdbc</artifactId>
		<version>1.1.1</version>
	</dependency>
	<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
	<dependency>
	    <groupId>mysql</groupId>
	    <artifactId>mysql-connector-java</artifactId>
	    <version>5.1.38</version>
	</dependency>
 </dependencies>

代码开发

主方法开发:

package com.fgm.storm.stormToHdfs;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.topology.TopologyBuilder;

/**
 *
 * @Auther: fgm
 */
public class OrderMain {

    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        //source:  http://storm.apache.org/releases/1.1.2/storm-hdfs.html

        // use "|" instead of "," for field delimiter
        RecordFormat format = new DelimitedRecordFormat()
                .withFieldDelimiter("|");

        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

        //指定我们的文件上传到hdfs的哪个路径,注意这个路径不存在需要自己创建,并且要改变这个路径的权限
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/stormhdfs/");

        HdfsBolt bolt = new HdfsBolt()
                .withFsUrl("hdfs://node01:8020")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("randomOrderSpout", new RandomOrderSpout());
        builder.setBolt("countMoneyBolt", new CountMoneyBolt()).localOrShuffleGrouping("randomOrderSpout");
        builder.setBolt("hdfsBolt", bolt).localOrShuffleGrouping("countMoneyBolt");

        Config config=new Config();
        //调整我们异或算法的线程个数,
        config.setNumAckers(5);

        /**
         * 如果线程池当中的消息太多都消不掉,说明代码可能存在问题
         * 设置我们内存池当中最多有5000个消息没有消掉,就不要再发送数据了,继续发送,会导致oom的异常
         */
        config.setMaxSpoutPending(5000);


        if (null!=args && args.length>0){
            StormSubmitter.submitTopology(args[0],config,builder.createTopology());
        }else{
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("stormHdfs",config,builder.createTopology());
        }

    }
}

代码详细可参考https://github.com/Fenggm/storm-study

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇HDFS三进程启动IP更改教程 下一篇Spark 创建RDD (集合,本地文件,..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目