ilepath);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);// 用5M的缓冲读取文本文件
String line = "";
while((line = reader.readLine()) != null){
String[] data = line.split(",");
//根据前两位, 确定要使用哪个Writer. 相同2位前缀的记录写到同一个db文件里
String prefData = data[2];
maps.get(prefData).put(data[1], data[0]);
}
fis.close();
reader.close();
}
for (Map.Entry<String, StoreWriter> entry : maps.entrySet()) {
entry.getValue().close();
}
}
查询一条记录就很简单了, 首先解析出MD5的前两位, 找到对应的paldb文件, 直接读取:
System.out.println("QUERYING>>>>>>>>>");
String file = md5.substring(0,2) + ".paldb";
StoreReader reader = PalDB.createReader(new File(folder + file));
String id = reader.get(md5);
System.out.println(id);
sparkey@spotify
sparkey也声称对于read-heavy systems with infrequent large bulk inserts对于经常读,不经常(大批量)写的性能很好.
sparkey有两种文件:索引文件(index file)和日志文件(log file).
Spark BulkLoad
HBaseRDD: https://github.com/unicredit/hbase-rdd
SparkOnHBase在最新的HBase版本中已经合并到了hbase代码中.
建立一个columnfamily=id. 并且在这个cf下有一个column=id存储id数据(cf必须事先建立,column则是动态的).
create 'data.md5_id','id'
put 'data.md5_id','a9fdddddddddddddddddddddddddddde','id:id','111111111111'
get 'data.md5_id','a9fdddddddddddddddddddddddddddde'
scan 'data.md5_id'
Spark的基本思路是: 读取文本文件, 构造RowKey -> Map<CF -> Map<Column -> Value>>的RDD:
val rdd = sc.textFile(folder).map({ line =>
val data = line split ","
val content = Map(cf -> Map(column -> data(0)))
data(1) -> content
})
rdd.toHBaseBulk(table)
HBase BulkLoad
HBase的BulkLoad分为两个节点: 运行MapReduce生成HFile文件, 导入到HBase集群
数据存储: http://zqhxuyuan.github.io/2015/12/19/2015-12-19-HBase-BulkLoad/
Input |
Output |
Time |
multi |
26G |
87.3G |
20min |
3.3 |
806.5G |
2.6T |
10h |
3.3 |
6T |
18T |
… |
3 |
查询(多线程): http://zqhxuyuan.github.io/2015/12/21/2015-12-21-HBase-Query/
Data |
Storage |
Query |
Cost |
mob |
35亿 |
18万 |
15min |
id |
1000亿 |
存在的问题: 在生成HFile时,是对每个原始文件做MR任务的,即每个原始文件都启动一个MR作业生成HFile.
这样只保证了Reduce生成的HFile在这个原始文件是有序的.不能保证所有原始文件生成的HFile是全局有序的.
这样当只导入第一个文件夹时,BulkLoad是直接移动文件.但是导入接下来生成的文件夹时,就会发生Split操作!
虽然每个MapReduce生成的HFile在这个文件夹内是有序的. 但是不能保证所有MR作业的HFile是全局有序的!
MapReduce/importtsv completebulkload(mv)
txt1 -------------------> HFile(00-03) --------------------> Region
HFile(03-10) --------------------> Region
HFile(10-30) ? --------------------> Region
MapReduce/importtsv bulkload(split and copy!)
txt2 -------------------> HFile(01-04)
HFile(04-06)
HFile(06-15)
数据验证:
hbase(main):002:0> get 'data.md5_mob2','2774f8075a3a7707ddf6b3429c78c041'
COLUMN CELL
0 row(s) in 0.2790 seconds
hbase(main):003:0> get 'data.md5_mob2','695c52195b25cd74fef1a02f4947d2b5'
COLUMN CELL
mob:c1 timestamp=1450535656819, value=69
mob:c2 timestamp=1450535656819, value=5c
mob:mob timestamp=1450535656819, value=13829274666
3 row(s) in 0.0640 seconds
Cassandra
Cassandra和HBase都是列式数据库.HBase因为使用MapReduce,所以读取HDFS上的大文件时,会分成多个Map任务.
Cassandra导入数据不可避免的是需要读取原始的大文件,一种直接生成SSTable,一种是读取后直接写入到集群中.
SSTable Writer
//构造Cassandra