设为首页 加入收藏

TOP

读取HDFS写入HBase
2018-12-11 17:16:59 】 浏览:60
Tags:读取 HDFS 写入 HBase
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011596455/article/details/70139054






import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.MasterNotRunningException;

/**
* @author:HuFeiHu
* @version: April 1, 2017
*/

public class Hw1Grp5
{

    /**
     * OriginalData store the original data
     */

    private static Map<String,ArrayList<String>> OriginalData=new HashMap<String,ArrayList<String>>();

    /**
     * @param files
     * @param Operator
     * @param seNum
     * @param Value
     * @param discount
     * @param number
     * @throws IOException
     * @throws URISyntaxException
     */
    public static void ReadHdfs(String files,String Operator,int seNum,double Value,int discount,int number[]) throws URISyntaxException, IOException
    {
        //configure hadoop file system
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(files), configuration);

        //read data from hadoop file system
        Path path = new Path(files);
        FSDataInputStream instream = fs.open(path);
        BufferedReader in = new BufferedReader(new InputStreamReader(instream));
        String temp_Line;
        int count=0;
        int tvalue;

        // select and distinct
        while ((temp_Line=in.readLine())!=null) {
            count++;
             String[] items=temp_Line.split("\\|");
             boolean symbol=false;
             // compare the value
             if(Operator.equals("gt"))
             {
                tvalue= Integer.valueOf(items[seNum]);
                if(tvalue > Value){
                    symbol=true;
                    System.out.println("gt");
                }
             }
             else if(Operator.equals("ge"))
             {
                tvalue= Integer.valueOf(items[seNum]);
                if(tvalue >= Value)
                    symbol=true;
             }
             else if(Operator.equals("eq"))
             {
                tvalue= Integer.valueOf(items[seNum]);
                if(tvalue == Value)
                    symbol=true;
             }
             else if(Operator.equals("lt"))
             {
                tvalue= Integer.valueOf(items[seNum]);
                if(tvalue < Value)
                    symbol=true;
             }
             else if(Operator.equals("le"))
             {
                tvalue= Integer.valueOf(items[seNum]);
                if(tvalue <= Value)
                    symbol=true;
             }
             else if(Operator.equals("ne"))
             {
                tvalue= Integer.valueOf(items[seNum]);
                if(tvalue != Value)
                    symbol=true;
             }
             else{
                 System.out.println("Operator error");
                 System.exit(0);
             }
             if(symbol == true)
             {
                 StringBuilder key=new StringBuilder("");
                 for(int i=0;i<discount;i++)
                 {
                     key.append(items[number[i]]);
                 }
                 System.out.println(key.toString());
                 if(OriginalData.containsKey(key))
                 {
                     // the temprecord is the same with record in the OriginalData by the key
                     continue;
                 }
                 ArrayList<String> record=new ArrayList<String>();
                 for(int i=0;i<discount;i++)
                 {
                     record.add(items[number[i]]);
                 }
                 OriginalData.put(key.toString(), record);
             }  
        }          
        in.close();
        fs.close();

    }

    /**
    * @param number
    * @param discount
    * @throws IOException
    * @throws MasterNotRunningException
    * @throws ZooKeeperConnectionException
    */
    public static void WriteHbase(int[] number,int discount) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
        // set HTable descriptor
        String table_Name="Result";
        HTableDescriptor HTabledescriptor=new HTableDescriptor(TableName.valueOf(table_Name));
        HColumnDescriptor cof=new HColumnDescriptor("res");
        HTabledescriptor.addFamily(cof);

        //configure the hbase
        Configuration HBconfiguration=HBaseConfiguration.create();
        HBaseAdmin hbaseAdm=new HBaseAdmin(HBconfiguration);

        //if hbase table existed and  then delete it
        if(hbaseAdm.tableExists(table_Name)){
            hbaseAdm.disableTable(table_Name);
            hbaseAdm.deleteTable(table_Name);
        }
        // create a new table
        hbaseAdm.createTable(HTabledescriptor);

        //put data into the hbaseTable
        HTable table=new HTable(HBconfiguration,table_Name);
        int count=0;
        for(String key:OriginalData.keySet()){
            Put put =new Put(String.valueOf(count).getBytes());
            for(int i=0;i<discount;i++){
                put.add("res".getBytes(),("R"+String.valueOf(number[i])).getBytes(),(OriginalData.get(key).get(i)).getBytes());
                table.put(put);
            }
            count++;
        }
        table.close();
    }

    /**
     * @param args
     * @throws IOException
     * @throw URISyntaxException

     */
    public static void main(String[] args) throws IOException,URISyntaxException,MasterNotRunningException, ZooKeeperConnectionException {
        // if the param is wrong and then exit
        if(args.length!=3)
        {
            System.out.println("the args are error");
            System.exit(0);
        }
        // files path in the hdfs
        String files="hdfs://localhost:9000"+args[0].substring(2);
        // select the query args
        String queryParam=args[1];
        // distinct the args
        String distinct=args[2];

        // get column numbers
        int seNum = 0;
        String Operator = null;
        double Value = 0;
        for(int i=0;i<queryParam.length();i++){
            if(queryParam.charAt(i)=='R'){
                seNum=(queryParam.charAt(i+1))-'0';
                Operator=queryParam.substring(i+3,i+5);
                Value=Double.valueOf(queryParam.substring(i+6));
                break;
            }
        }
        // get numbers in distinct
        int[] number=new int[distinct.length()/2];
        int discount=0;
        for(int i=0;i<distinct.length();i++){
            if(distinct.charAt(i)=='R'){
                number[discount++]=distinct.charAt(i+1)-'0';
            }
        }
//       read data from hdfs and operate
        ReadHdfs(files, Operator, seNum, Value, discount, number);
//       write data to hbase
        WriteHbase(number,discount);

    }

}


    import java.io.BufferedReader;  
    import java.io.IOException;  
    import java.io.InputStreamReader;  
    import java.net.URI;  
    import java.net.URISyntaxException;  
    import java.util.HashMap;  
    import java.util.Map;  
      
    import org.apache.commons.lang.StringUtils;  
    import org.apache.hadoop.conf.Configuration;  
    import org.apache.hadoop.fs.FSDataInputStream;  
    import org.apache.hadoop.fs.FileSystem;  
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.hbase.HBaseConfiguration;  
    import org.apache.hadoop.hbase.HColumnDescriptor;  
    import org.apache.hadoop.hbase.HTableDescriptor;  
    import org.apache.hadoop.hbase.MasterNotRunningException;  
    import org.apache.hadoop.hbase.TableName;  
    import org.apache.hadoop.hbase.ZooKeeperConnectionException;  
    import org.apache.hadoop.hbase.client.HBaseAdmin;  
    import org.apache.hadoop.hbase.client.HTable;  
    import org.apache.hadoop.hbase.client.Put;  
      
    public class Hw1Grp2 {  
          
        //hbase 表名  
        private static final String TABLE_NAME = "Result";  
           //列簇名  
        private static final String COLMUN_FAMILY = "res";  
        private HTable table;  
        public HTable getTable() {  
            return table;  
        }  
        public void setTable(HTable table) {  
            this.table = table;  
        }  
          
            public BufferedReader readHdfs(String file) throws IOException, URISyntaxException{  
              
            Configuration conf = new Configuration();  
            FileSystem fs = FileSystem.get(URI.create(file), conf);  
            Path path = new Path(file);  
            FSDataInputStream inStream = fs.open(path);  
            BufferedReader in = new BufferedReader(new InputStreamReader(inStream));  
            return in;  
        }  
      
        public HTable createTable(String tableName) throws MasterNotRunningException,  
                  ZooKeeperConnectionException, IOException{  
                Configuration configuration = HBaseConfiguration.create();  
                HBaseAdmin hAdmin = new HBaseAdmin(configuration);  
                if(hAdmin.tableExists(tableName)) {  
                   System.out.println("table is exists, delete exists table");  
                   hAdmin.disableTable(tableName);  
                   hAdmin.deleteTable(tableName);  
                } else {  
                   System.out.println("table not exists");  
                }  
                HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));  
                HColumnDescriptor cf = new HColumnDescriptor(COLMUN_FAMILY);  
                htd.addFamily(cf);  
                hAdmin.createTable(htd);  
                hAdmin.close();  
                System.out.println("table create");  
                return new HTable(configuration,tableName);  
           }  
           public void insert(String rowKey, String family, String qualifier, String value) throws IOException {  
            Put put = new Put(rowKey.getBytes());  
            put.add(family.getBytes(),qualifier.getBytes(),value.getBytes());  
            table.put(put);  
           }  
        public void handleData(String file, int rowKey, Map<String, Integer> args) throws IOException, URISyntaxException {  
               String colStr = null;  
               BufferedReader buffer = readHdfs(file);  
                 
               //rowKey和count哈希表  
               Map<String, Integer> mapCount = new HashMap<String, Integer>();  
                 
               //rowKey 的某列sum哈希表  
               Map<String, Integer> mapSum = new HashMap<String, Integer>();  
                 
               //max哈希表  
               Map<String, Integer> mapMax = new HashMap<String, Integer>();  
                 
               //avg哈希表  
               Map<String, Float> mapAvg = new HashMap<String, Float>();  
                 
               //min哈希表  
               Map<String, Integer> mapMin = new HashMap<String, Integer>();  
               int maxCol = -1, avgCol = -1, sumCol = -1, minCol = -1, countCol = -1;  
                 
               //根据传进来的参数设置需要进行的聚合函数  
               if(args.containsKey("count")) {  
                   countCol = args.get("count");  
               }  
               if(args.containsKey("avg")) {  
                   avgCol = args.get("avg");  
               }  
               if(args.containsKey("max")) {  
                   maxCol = args.get("max");  
               }  
               if(args.containsKey("sum")) {  
                   sumCol = args.get("sum");  
               }  
               if(args.containsKey("min")) {  
                   minCol = args.get("min");  
               }  
               //算出需要用到的聚合函数  
               String str;  
               while((str = buffer.readLine()) != null) {  
                   String[] col = str.split("\\|");  
                   if(mapCount.containsKey(col[rowKey])) {  
                         mapCount.put(col[rowKey], mapCount.get(col[rowKey]) +1 );  
                   } else {  
                         mapCount.put(col[rowKey], 1);  
                   }  
                   if(sumCol != -1) {  
                       if(mapSum.containsKey(col[rowKey])) {  
                           mapSum.put(col[rowKey], mapSum.get(col[rowKey]) +Integer.parseInt(col[sumCol]) );  
                       } else {  
                           mapSum.put(col[rowKey], Integer.parseInt(col[sumCol]));  
                       }  
                   }  
                   if(avgCol != -1) {  
                       if(mapAvg.containsKey(col[rowKey])) {  
                           mapAvg.put(col[rowKey], mapAvg.get(col[rowKey]) +Float.parseFloat(col[avgCol]) );  
                       } else {  
                           mapAvg.put(col[rowKey], Float.parseFloat(col[avgCol]));  
                       }  
                   }  
                   if(maxCol != -1) {  
                       if(mapMax.containsKey(col[rowKey])) {  
                           if(Integer.parseInt(col[maxCol]) > mapMax.get(col[rowKey]))  
                              mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));  
                       } else {  
                           mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));  
                       }  
                   }  
                   if(minCol != -1) {  
                       if(mapMin.containsKey(col[rowKey])) {  
                           if(Integer.parseInt(col[minCol]) < mapMin.get(col[rowKey]))  
                              mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));  
                       } else {  
                           mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));  
                       }  
                   }  
               }  
               //从hashmap中插入数据表  
               for(String key : mapCount.keySet()) {  
                         if(countCol != -1) {  
                         colStr = "count";  
                           insert(key, "res", colStr, mapCount.get(key) + "");  
                   }  
                   if(avgCol != -1) {  
                     colStr = "avg(R" + avgCol + ")";  
                     mapAvg.put(key, (float)Math.round(mapAvg.get(key)/mapCount.get(key)*100)/100);  
                     insert(key, "res", colStr, mapAvg.get(key) + "");  
                   }  
                   if(maxCol != -1) {  
                     colStr = "max(R" + maxCol + ")";  
                     insert(key, "res", colStr, mapMax.get(key) + "");  
                   }  
                   if(minCol != -1) {  
                     colStr = "min(R" + minCol + ")";  
                     insert(key, "res", colStr, mapMin.get(key) + "");  
                   }  
                   if(sumCol != -1) {  
                     colStr = "sum(R" + sumCol + ")";  
                     insert(key, "res", colStr, mapSum.get(key) + "");  
                   }  
               }  
               System.out.println("handle data success");  
        }  
        public static void main(String[] args) throws IOException, URISyntaxException {  
            /** 
             * 命令参数解析,解析出文件名,group by的列,需要求的聚合函数 
             */  
            if(args.length != 3) {  
                System.out.println("input args length error");  
                System.exit(0);  
            }  
            String file = StringUtils.substringAfter(args[0], "=");  
            if(file == null) {  
                System.out.println("args error");  
                System.exit(0);  
            }  
            String keyNum = StringUtils.substringAfter(args[1], "R");  
            if(keyNum  == null) {  
                System.out.println("args error");  
                System.exit(0);  
            }  
            int rowKey = Integer.parseInt(keyNum);  
              
            String colsName = StringUtils.substringAfter(args[2], ":");  
            if(colsName == null) {  
                System.out.println("args error");  
                System.exit(0);  
            }  
            String[] cmdStr = colsName.split(",");  
            Map<String, Integer> cmd = new HashMap<String, Integer>();  
            for(int i = 0; i < cmdStr.length; i++) {  
                if(!cmdStr[i].equals("count")) {  
                    cmd.put(StringUtils.substringBefore(cmdStr[i], "("), Integer.parseInt(StringUtils.substringBetween(cmdStr[i],"R", ")")));  
                } else {  
                    cmd.put(cmdStr[i], rowKey);  
                }  
            }  
            System.out.println("file:" + file);  
            for(String key : cmd.keySet()) {  
                System.out.println(key + ":" + cmd.get(key));  
            }  
            Hw1Grp2 h = new Hw1Grp2();  
            h.setTable(h.createTable(TABLE_NAME));  
            h.handleData(file, rowKey, cmd);  
            System.out.println("program is over");  
        }  
    }  


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hbase的认证 下一篇C++读写HBase代码

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目