HBase put过程客户端+服务端初步解析
本文将对HBase采用客户端put的方式,结合源码对整个过程进行解析。
对于服务端的解析,本文并没有说的很详细,只是阐述了整个流程,在后面的一片文章将会详细说明服务端的具体过程
PUT 客户端
- HBase的client写入过程都是先创建put对象,然后将调用HTable的put方法,如下:
Put put = new Put("rowkey".getBytes();
put.addColumn("family".getBytes(),"qualifier".getBytes(),"value".getBytes());
htable.put(put);
调用HTable的put方法后,数据看起来就会被写入到HBase中。当然HTable的put还可以放一个put的list。
- HTable.put
HTable会维护一个buffer,put都会往这个buffer里放,一旦这个buffer中的数量达到一定的值,就会把这个buffer发给服务器,当然,客户端也可以显示调用,强制每一次的刷新都发送出去。HBase的表操作,默认情况下客户端写缓冲区是关闭的,即table.isAutoFlush() = true,
这种情况下,对表的单行操作会实时发送到服务端完成。因此,对于海量数据插入,修改,RPC通信频繁,效率比较低。这种场景下,可以通过激活客户端缓冲区,批量提交操作请求,提高操作效率。
其次客户端在提交请求的时候,会将所有的请求进行一个分组,按照regionServer进行分组。至于如何得到每一个请求所在的regionserver就应该是借助了Zookeeper了。
下面看源码:
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}
public synchronized void mutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
doMutate(m);
}
private void doMutate(Mutation m) {
currentWriteBufferSize += m.heapSize();
writeAsyncBuffer.add(m);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
private void backgroundFlushCommits(boolean synchronous) {
try {
if (!synchronous) {
ap.submit(tableName, writeAsyncBuffer, true, null, false);
}
}
public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
List< extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
boolean needResults) {
do {
Iterator< extends Row> it = rows.iterator();
while (it.hasNext()) {
Row r = it.next();
HRegionLocation loc;
try {
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
loc = locs.getDefaultRegionLocation();
}
if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();
}
}
} while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
locationErrors, locationErrorRows, actionsByServer, pool);
}
private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
ServerName server = e.getKey();
MultiAction<Row> multiAction = e.getValue();
Collection< extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,numAttempt);
for (Runnable runnable : runnables) {
if ((--actionsRemaining == 0) && reuseThread) {
runnable.run();
} else {
try {
pool.submit(runnable);
}
responseProto = getStub().multi(controller, requestProto);
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}
Server端
当请求发送到server端之后,涉及到的操作就比较复杂了,会涉及到写入WAL,MemStore,以及MemStore刷新等过程。我们就源码对这些过程进行描述。
-
RSRpcServices.java multi方法:
客户端的调用应该都会到这个方法进行处理,这个方法的参数是一个request,在客户端也提到了,一个请求应该是包含多个action的。
遍历这个request的action,按region进行遍历,每一个region的action得到一个RegionAction。 我理解的这里 一个request里面只会有一个regionserver里面的数据。
根据RegionAction得到HRegion,并且遍历这个region的所有action。这里他把一个RegionAction的所有action都放到了一个RowMutations。按道理来说这个RowMutations就应该指的是一行的数据,如果是这样的话 这个Regionaction就不应该是一个region的数据了,而是一行的数据,只有这种逻辑,才符合后面的操作
以下是源码部分:
public MultiResponse multi(final RpcController rpcc, final MultiRequest request){
for (RegionAction regionAction : request.getRegionActionList()) {
HRegion region;
region = getRegion(regionAction.getRegion());
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
cellScanner);
}
}
private ClientProtos.RegionLoadStats mutateRows(final HRegion region,
final List<ClientProtos.Action> actions,
final CellScanner cellScanner) {
RowMutations rm = null;
for (ClientProtos.Action action: actions) {
MutationType type = action.getMutation().getMutateType();
if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
region.mutateRow(rm);
return region.getRegionStats();
}
- HRegion.mutateRow(RowMutations rm)方法:
-
上诉得到的一个RowMutations将交到HReigion的这个方法来进行处理。会构建一个MultiRowMutationProcessor,这里面将包含所有的action。然后这个MultiRowMutationProcessor将交给processRowsWithLocks处理。在构建这个processor的时候同时还需要传入一个rowkey的集合,这个集合代码所有要锁的行。
-
但是我们这里虽然说是一个rowkey的集合,但是按照前面的说法,一个RowMutations只包含一行的数据,那么这个集合里面也就只有一个元素。
-
会遍历所有要上锁的行,获取全部的锁。(感觉这里也可以理解哈 集合里面只有一行)
-
这里还会获取这个region的读锁,是updateLoag的readloag。 意味着这个时候不可读?
-
获取一个mvccNum。这个mvccNum是根据这个时候,这个region的sequenceID获取的。这个值的获取感觉很奇特,是在sequenceid的基础上加了10亿。按照这个方法的说法是10亿是一个足够大的数,可以保证在这个mvcc写完成之前没有scanner可以达到这个数。
-
现在,我们前面提到过,这个时候所有的action都是在MultiRowMutationProcessor里面的,把actiob拿出来放到两个地方。一个是List mutations ,一个是WALEdit walEdit = new WALEdit()。前者代表了要写入到Memstore中的,后者代表要写入到WAL中的。在放入到WALEdit中的时候,需要进行判断,并不是所有的action都要写入到WAL中。
-
写入到memstore
-
写入到wal
这里插入一个问题,关于是先写入mem还是先写入wal。按照wal的定义,write ahead logging,预写日志,是应该先写入wal的。
在0.94版本之前,Region中的写入顺序是先写WAL再写MemStore,这与WAL的定义也相符。
但在0.94版本中,将这两者的顺序颠倒了,当时颠倒的初衷,是为了使得行锁能够在WAL sync之前先释放,从而可以提升针对单行数据的更新性能。详细问题单,请参考HBASE-4528。
在2.0版本中,这一行为又被改回去了,原因在于修改了行锁机制以后(下面章节将讲到),发现了一些性能下降,而HBASE-4528中的优化却无法再发挥作用,详情请参考HBASE-15158。改动之后的逻辑也更简洁了。
选自https://mp.weixin.qq.com/s__biz=MzI4Njk3NjU1OQ==&mid=2247483748&idx=1&sn=37b1ce75ef45f7fbb76a7092ad22ce5f&chksm=ebd5fe24dca27732e7deae4abf1409599d6d01d76df4b479ae0b1a05259461d30d04c11402db&scene=21#wechat_redirect
下面我们看源码
public void mutateRow(RowMutations rm) throws IOException {
mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
}
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce){
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
processRowsWithLocks(proc, -1, nonceGroup, nonce);
}
public void processRowsWithLocks(RowProcessor<,> processor, long timeout,
long nonceGroup, long nonce){
WALEdit walEdit = new WALEdit();
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
boolean locked;
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
List<Mutation> mutations = new ArrayList<Mutation>();
List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
WALKey walKey = null;
try {
acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
acquiredRowLocks.add(getRowLock(row));
}
lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 1 : acquiredRowLocks.size());
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
try {
doProcessRowWithTimeout(
processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
processor.preBatchMutate(this, walEdit);
for (Mutation m : mutations) {
rewriteCellTags(m.getFamilyCellMap(), m);
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
CellUtil.setSequenceId(cell, mvccNum);
Store store = getStore(cell);
if (store == null) {
checkFamily(CellUtil.cloneFamily(cell));
}
Pair<Long, Cell> ret = store.add(cell);
addedSize += ret.getFirst();
memstoreCells.add(ret.getSecond());
}
}
long txid = 0;
if (!walEdit.isEmpty()) {
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, getSequenceId(), true, memstoreCells);
}
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
releaseRowLocks(acquiredRowLocks);
if (txid != 0) {
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
}
walSyncSuccessful = true;
processor.postBatchMutate(this);
}
} finally {
} finally {
closeRegionOperation();
if (!mutations.isEmpty() &&
isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
requestFlush();
}
}