的Writer对象
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
builder.inDirectory(outputDir).forTable(SCHEMA).using(INSERT_STMT).withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
//读取大文件,写入到Writer对象,最终会生成SSTable文件
while ((line = reader.readLine()) != null) {
writer.addRow(line.split(",")[1],line.split(",")[0]);
}
单独地遍历文件,不做任何事情,耗时100s=2min. 则读取6T的文件,耗时2000min=33hour.
Driver API
List<Statement> statementList = new ArrayList();
while ((line = reader.readLine()) != null) {
BoundStatement bound = insert.bind(line.split(",")[1],line.split(",")[0]);
statementList.add(bound);
if(statementList.size() >= 65535){
flush(statementList);
statementList.clear();
}
}
// 批量写入
public static void flush(List<Statement> buffer) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
for (Statement bound : buffer) {
batch.add(bound);
}
client.execute(batch);
}
KV DataBase
其实我们的业务中只是KeyValue,最适合的不是列式数据库,而是KV数据库.常见的KV数据库有:MemCache,Redis,LevelDB/RocksDB,Riak.
LevelDB
一个数据库一次只能被一个进程打开。leveldb的实现要求使用来自操作系统的锁来阻止对数据库的滥用。在单进程中,同一个leveldb::DB对象可以被多个并发线程安全地共享。即,针对同一个数据库,在没有任何外部同步措施的前提下(leveldb实现本身将会自动去做所需要的同步过程),不同的线程可以写入迭代器或者获取迭代器或者调用Get方法。但是,其它的对象(比如Iterator和WriteBatch)可能需要外部的同步过程。如果两个线程共享一个这样的对象,这俩线程必须通过它们各自的加锁协议(locking protocol)来保护对这个对象的访问。
-rw-r--r--. 1 qihuang.zheng users 0 12月 24 11:44 000003.log
-rw-r--r--. 1 qihuang.zheng users 16 12月 24 11:44 CURRENT
-rw-r--r--. 1 qihuang.zheng users 0 12月 24 11:44 LOCK
-rw-r--r--. 1 qihuang.zheng users 57 12月 24 11:44 LOG
-rw-r--r--. 1 qihuang.zheng users 65536 12月 24 11:44 MANIFEST-000002
?????????????????????????????????????????
-rw-r--r--. 1 qihuang.zheng users 2116214 12月 24 11:49 000408.sst
...
-rw-r--r--. 1 qihuang.zheng users 3080192 12月 24 11:55 001210.sst
-rw-r--r--. 1 qihuang.zheng users 16 12月 24 11:44 CURRENT
-rw-r--r--. 1 qihuang.zheng users 0 12月 24 11:44 LOCK
-rw-r--r--. 1 qihuang.zheng users 215845 12月 24 11:55 LOG
-rw-r--r--. 1 qihuang.zheng users 196608 12月 24 11:55 MANIFEST-000002
可以看到旧的sst(SSTable)不断被删除,并用新的sst文件代替. 但是速度在处理大文件时依旧很慢.
结论: 涉及到要读取原始文件,遍历每一行,然后调用存储的写入方式即使采用批量,也会很慢.
而HBase的BulkLoad会开启多个Map任务读取大文件,因此速度会比遍历读取大文件要快.
happybase
既然读取大文件很慢,能不能在生成md5数据的时候不写文件, 直接写到目标数据库.
import happybase
connection = happybase.Connection('192.168.47.213')
table = connection.table('data.md5_id2')
def write_data(li):
batch = table.batch(wal=False)
for ele in li:
#wf.write(','.join(ele) + '\n')
#wf.flush()
batch.put(ele[0], {'id:id': ele[1]})
batch.send()
运行一个省份(35,记录数34亿)耗时:
2015-12-29 09:53:38 350100 19550229 999 60000
2015-12-31 02:35:38 359002 20011119 999 3457560000
其他
删除文件名长度=4的所有文件(不包括文件名后缀)
find . -type f | grep -P '/.{8}$' | xargs rm
a=($(ls | grep -E '[0-9a-f]{4}.txt')) && for i in "${a[@]}";do rm -rf "$i";done
查看进程的文件句柄数量(开了两个进程在跑,每个进程用了16^4=65535)
[qihuang.zheng@192-168-47-248 version2]$ lsof -n|awk '{print $2}'|sort|uniq -c |sort -nr|head -2
65562 6516
65562 10230
[qihuang.zheng@192-168-47-248 version2]$ jps
6516 GenIdCardRawFile
10230 GenIdCardRawFile
Final:Cassandra
数据存储
建表,列名统一为md5和id
CREATE KEYSPACE data WI