起因:前段时间,我们把通过happybase向hbase 写数据的操作put() 操作换成了batch() 结果发现性能并没有提升
阅读代码,我发现put() 实现使用的就是批量插入
table.py
def put(self, row, data, timestamp=None, wal=True):
"""Store data in the table.
This method stores the data in the `data` argument for the row
specified by `row`. The `data` argument is dictionary that maps columns
to values. Column names must include a family and qualifier part, e.g.
`cf:col`, though the qualifier part may be the empty string, e.g.
`cf:`.
Note that, in many situations, :py:meth:`batch()` is a more appropriate
method to manipulate data.
.. versionadded:: 0.7
`wal` argument
:param str row: the row key
:param dict data: the data to store
:param int timestamp: timestamp (optional)
:param wal bool: whether to write to the WAL (optional)
"""
with self.batch(timestamp=timestamp, wal=wal) as batch:
batch.put(row, data) # 很明显是批量操作
batch.py
class Batch(object): """Batch mutation class. This class cannot be instantiated directly; use :py:meth:`Table.batch` instead. """ def __init__(self, table, timestamp=None, batch_size=None, transaction=False, wal=True): """Initialise a new Batch instance.""" if not (timestamp is None or isinstance(timestamp, Integral)): raise TypeError("'timestamp' must be an integer or None") if batch_size is not None: if transaction: raise TypeError("'transaction' cannot be used when " "'batch_size' is specified") if not batch_size > 0: raise ValueError("'batch_size' must be > 0") self._table = table self._batch_size = batch_size self._timestamp = timestamp self._transaction = transaction self._wal = wal self._families = None self._reset_mutations() def _reset_mutations(self): """Reset the internal mutation buffer.""" self._mutations = defaultdict(list) self._mutation_count = 0 def send(self): """Send the batch to the server.""" bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()] if not bms: return logger.debug("Sending batch for '%s' (%d mutations on %d rows)", self._table.name, self._mutation_count, len(bms)) if self._timestamp is None: self._table.connection.client.mutateRows(self._table.name, bms, {}) else: self._table.connection.client.mutateRowsTs( self._table.name, bms, self._timestamp, {}) self._reset_mutations() # # Mutation methods # def put(self, row, data, wal=None): """Store data in the table. See :py:meth:`Table.put` for a description of the `row`, `data`, and `wal` arguments. The `wal` argument should normally not be used; its only use is to override the batch-wide value passed to :py:meth:`Table.batch`. """ if wal is None: wal = self._wal self._mutations[row].extend( Mutation( isDelete=False, column=column, value=value, writeToWAL=wal) for column, value in data.iteritems()) self._mutation_count += len(data) if self._batch_size and self._mutation_count >= self._batch_size: # 只有大于_batch_size 才会真正发送数据 self.send()