public T More ...deepCopy(T source) {
50 ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
51 DataOutputStream dataOut = new DataOutputStream(byteOutStream);
52 T copiedValue = null;
53 try {
54 source.write(dataOut);
55 dataOut.flush();
56 ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
57 DataInput dataInput = new DataInputStream(byteInStream);
58 copiedValue = writableClass.newInstance();
59 copiedValue.readFields(dataInput);
60 } catch (Exception e) {
61 throw new CrunchRuntimeException("Error while deep copying " + source, e);
62 }
63 return copiedValue;
64 }
package mahout.fansy.utils.read;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.types.writable.WritableDeepCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class ReadArbiKV {
/**
* 读取任意序列文件
*/
public static Configuration conf=new Configuration();
public static WritableDeepCopier wdc;
static String fPath="";
static String trainPath="";
static{
conf.set("mapred.job.tracker", "ubuntu:9001");
fPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout/labelindex"; // 数据文件
}
public static void main(String[] args) throws IOException {
readFromFile(fPath);
// readFromFile(trainPath);
}
/**
* 读取序列文件
* @param fPath
* @return
* @throws IOException
*/
public static Map readFromFile(String fPath) throws IOException{
FileSystem fs = FileSystem.get(URI.create(fPath), conf);
Path path = new Path(fPath);
Map map=new HashMap();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
@SuppressWarnings("unchecked")
Class writableClassK=(Class) reader.getKeyClass();
@SuppressWarnings("unchecked")
Class writableClassV=(Class) reader.getValueClass();
while (reader.next(key, value)) {
// Writable k=; // 如何实现Writable的深度复制?
Writable k=deepCopy(key, writableClassK); // Writable 的深度复制
Writable v=deepCopy(value,writableClassV);
map.put(k, v);
// System.out.println(key.toString()+", "+value.toString());
// System.exit(-1);// 只打印第一条记录
}
} finally {
IOUtils.closeStream(reader);
}
return map;
}
/**
* Writable 的深度复制
* 引自WritableDeepCopier
* @param fPath
* @return
* @throws IOException
*/
public static Writable deepCopy(Writable source,Class writableClass) {
ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
DataOut