设为首页 加入收藏

TOP

Hadoop Writable深度复制及读取任意<key,value>序列文件(一)
2014-11-24 07:53:03 来源: 作者: 【 】 浏览:4
Tags:Hadoop Writable 深度 复制 读取 任意 < key value> 序列 文件
 public T More ...deepCopy(T source) {
50    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
51    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
52    T copiedValue = null;
53    try {
54      source.write(dataOut);
55      dataOut.flush();
56      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
57      DataInput dataInput = new DataInputStream(byteInStream);
58      copiedValue = writableClass.newInstance();
59      copiedValue.readFields(dataInput);
60    } catch (Exception e) {
61      throw new CrunchRuntimeException("Error while deep copying " + source, e);
62    }
63    return copiedValue;
64  }
package mahout.fansy.utils.read;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class ReadArbiKV {
 
	/**
	 * 读取任意序列文件
	 */
	public static Configuration conf=new Configuration();
	public static WritableDeepCopier wdc;
	static String fPath="";
	static String trainPath="";
	static{
		conf.set("mapred.job.tracker", "ubuntu:9001");
		fPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout/labelindex"; //  数据文件
	}
	public static void main(String[] args) throws IOException {
		readFromFile(fPath);
//		readFromFile(trainPath);
	}
	
	/**
	 * 读取序列文件
	 * @param fPath
	 * @return
	 * @throws IOException
	 */
	public static Map readFromFile(String fPath) throws IOException{
		FileSystem fs = FileSystem.get(URI.create(fPath), conf);
	    Path path = new Path(fPath);
	    Map map=new HashMap();
	    SequenceFile.Reader reader = null;
	    try {
	      reader = new SequenceFile.Reader(fs, path, conf);
	      Writable key = (Writable)
	        ReflectionUtils.newInstance(reader.getKeyClass(), conf);
	      Writable value = (Writable)
	        ReflectionUtils.newInstance(reader.getValueClass(), conf);
	      @SuppressWarnings("unchecked")
		Class writableClassK=(Class) reader.getKeyClass();
	      @SuppressWarnings("unchecked")
		Class writableClassV=(Class) reader.getValueClass();
	      while (reader.next(key, value)) {
	    	 // 	Writable k=;  // 如何实现Writable的深度复制?
	    	  Writable k=deepCopy(key, writableClassK); // Writable 的深度复制
	    	  Writable v=deepCopy(value,writableClassV);
	          map.put(k, v);
	    //	  System.out.println(key.toString()+", "+value.toString());
	    //	  System.exit(-1);// 只打印第一条记录
	      }
	    } finally {
	      IOUtils.closeStream(reader);
	    }
	    return map;
	}
	
	/**
	 * Writable 的深度复制
	 * 引自WritableDeepCopier
	 * @param fPath
	 * @return
	 * @throws IOException
	 */
	public static Writable deepCopy(Writable source,Class writableClass) {
		    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
		    DataOut
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇Linux下Mongodb安装和sql语句 下一篇MySQL数据库迁移到PostgreSQL

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·求navicat for mysql (2025-12-26 13:21:33)
·有哪位大哥推荐一下m (2025-12-26 13:21:30)
·MySQL下载与安装教程 (2025-12-26 13:21:26)
·Linux_百度百科 (2025-12-26 12:51:52)
·Shell 流程控制 | 菜 (2025-12-26 12:51:49)