设为首页 加入收藏

TOP

hdfs上小文件文件合并,
2018-11-29 00:09:10 】 浏览:699
Tags:hdfs 文件 合并

一个java类,一个spark object实现

package cn.smartstep.extract.tables
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.{Level,Logger}

import unicom.util.LoggerUtil;

/**
* Created by wqj on 2018/2/8.
* hdfs小文件合并,文件被合并在同级目录下的'*_b'
$SPARK_SUBMIT \
--class cn.smartstep.extract.tables.MergeHDFSFiles \
--deploy-mode cluster \
--master $MASTER \
--driver-memory $DRIVER_MEM \
--driver-java-options "$JAVA_OPTS" \
$EXECUTOR_MEM_COMMAND \
$SMARTSTEPS_SPARK_SETUP \
hdfs:///user/ss_deploy/tools/smartStep-tables.jar \
beijing \ //输入的中间目前名称
beijing_b \ //输出的中间目前名称
hdfs://xxx:8020/user/ss_deploy/ \
$@

*/

object MergeHDFSFiles {
def main(args: Array[String]) {

val conf: SparkConf = new SparkConf().setAppName(this.getClass().getName().replace("$", "") + "_" + args(0))
val sc: SparkContext = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
var basepath = "hdfs://xxx:8020/user/ss_deploy/"
var inputp = args(0)
var outputp = args(1)
if(args(2)!=""){//如果有basepath就以参数提供的为准
basepath=args(2)
}

var inputbasepath = basepath+inputp
var hashMap = HdfsFile.getContents(inputbasepath)

var iter = hashMap.keySet().iterator();

while (iter.hasNext()) {
var keyInput = iter.next().toString();
var arrkv = keyInput.split(inputp,2)
var valueFileCount = hashMap.get(keyInput).toString().toInt;
var dataOutPath = basepath+outputp+arrkv(1)

if( ! HdfsFile.getContentExist(dataOutPath)){

//输出采用gz压缩

sc.textFile(keyInput).coalesce(valueFileCount).saveAsTextFile(dataOutPath,classOf[GzipCodec])
}else{
LoggerUtil.warn("The Path already exist :" + dataOutPath)
}
//println(dataOutPath+ "->" + valueFileCount.toString().toInt)
}
sc.stop()
}

}

---------------------

package cn.smartstep.extract.tables;

import java.io.FileNotFoundException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;

import unicom.util.LoggerUtil;

/**
* Created by wqj on 2018/2/8.
*/
public class HdfsFile {
public static Logger logger = Logger.getLogger("info");
public static ArrayList<String> strArray = new ArrayList<String>();
public static Map<String, String> treeMap = new TreeMap<String, String>(
new Comparator<String>() {
public int compare(String obj1, String obj2) {
// 降序排序,升序排列
return obj1.compareTo(obj2);
}
});

public static void main(String[] args) throws Exception {
String basePath = "hdfs://xxx:8020/user/ss_deploy/030";

//Map hm = getContents(basePath);

}

/*
* 根据基路径,递归遍历子目录,获取叶子目录的和对应的文件大小
*/
public static Map<String, String> getContents(String basePath) throws Exception,
URISyntaxException, FileNotFoundException {
FileSystem fileSystem = FileSystem.get(new URI(basePath),new Configuration(), "ss_deploy");
FileStatus[] listStatus = fileSystem.listStatus(new Path(basePath));

for (FileStatus fileStatus : listStatus) {
Date time = new Date(fileStatus.getModificationTime());
FsPermission pm = fileStatus.getPermission();
short rp = fileStatus.getReplication();
String owner = fileStatus.getOwner();
String group = fileStatus.getGroup();
String name = fileStatus.getPath().toString();
if (fileStatus.isDirectory()) {
// System.out.println("d"+pm+"\t"+rp+"\t"+owner+"\t"+group+"\t"+time+"\t"+name);
try {
recursiveContent(fileSystem, fileStatus, fileStatus.getPath());
} catch (Exception e) {
e.printStackTrace();
}
} else {
//LoggerUtil.error("-" + pm + "\t" + rp + "\t" + owner + "\t"+ group + "\t" + time + "\t" + name);
}
}

/*
* 降序排列目录
*/
Set set = new HashSet();
List newList = new ArrayList();
for (String cd : strArray) {
if (set.add(cd)) {
newList.add(cd);
}
}

/*
* 根据目录获取数据量大小,如何计算出一个合理的输出文件数量
*/
for (int i = 0; i < newList.size(); i++) {

String namepath = newList.get(i).toString();

//输出采用gz压缩

long count = fileSystem.getContentSummary(new Path(newList.get(i).toString())).getLength()/(1024 * 2048 * 512);
System.out.println("SIZE : " + namepath + " |" + (count+4)+"");
treeMap.put(namepath, (count+4)+"");
}
return treeMap;
}

/*
* 递归遍历目录,过滤数包含"csv_"并且不包含"metadata"目录,然后添加在一个ArrayList中
*/
public static void recursiveContent(FileSystem fileSystem, FileStatus fileStatus,
Path path) throws Exception {
FileStatus[] subList = fileSystem.listStatus(path);
for (int i = 0; i < subList.length; i++) {
if (subList[i].isDirectory()) {
recursiveContent(fileSystem, subList[i], subList[i].getPath());
} else {
String zname = subList[i].getPath().getParent().toString();
if (zname.contains("csv_") && !zname.contains("metadata")) {
strArray.add(zname);
}else if (!zname.contains("metadata")){
Date time = new Date(subList[i].getModificationTime());
FsPermission pm = subList[i].getPermission();
short rp = subList[i].getReplication();
String owner = subList[i].getOwner();
String group = subList[i].getGroup();
String name = subList[i].getPath().toString();
System.out.println("-" + pm + "\t" + rp + "\t" + owner + "\t"+ group + "\t" + time + "\t" + name);
}

}
}

}

/*
* 判断目录是否存在,参数是需要判断的目录
*/
public static boolean getContentExist(String path) throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(path),new Configuration(),"ss_deploy");
return fileSystem.exists(new Path(path));
}

}



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇(1).hdfs特性及适用场景 下一篇HDFS 启动与关闭

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目