TOP

HBase 协处理器 (二)
2019-02-12 13:36:00 】 浏览:127
Tags:HBase 处理器

HBase 协处理器 (一)

3.7 The RegionObserver Class


用于 region level, 注意,这个类的所有调用方法的第一个参数为 ObserverContext<RegionCoprocessorEnvironment> ctx8, 提供访问 context 实例。


操作可以被划分为两组: region 生命周期变化和客户端 API 调用。两种类型都有通用的回调操作:

enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION,
MERGE_REGION, BATCH_MUTATE, REPLAY_BATCH_MUTATE, COMPACT_REGION
}
postStartRegionOperation(..., Operation operation)
postCloseRegionOperation(..., Operation operation)

RegionObserver 中的这些方法在任何可能的 Operation 发生时被调用。


3.7.1 处理 region 生命周期事件 (Handling Region Life-Cycle Events)
-----------------------------------------------------------------------------------------------------------------------------------------
observer 可以钩入到 pending open, open, 和 pending close 状态变化。每一个钩子被框架隐式调用。


● State: pending open
-------------------------------------------------------------------------------------------------------------------------------------
region 要被打开时,处于这个状态。监听的协处理器可以搭载这个过程,或者使这个过程失效。为此,下列回调按次序调用:

postLogReplay(...)

preOpen(...)
preStoreFileReaderOpen(...)
postStoreFileReaderOpen(...)
preWALRestore(...) / postWALRestore(...)
postOpen(...)

这些方法只在 region 打开之前被调用,在存储文件打开之前和之后调用,WAL 准备重放时调用,以及在打开 region 之后调用。用户自己的协处理器
实现可以利用这些回调,例如,指示框架在 preOpen() 中,终止打开过程。或者钩入到 postOpen() 调用来触发一个缓存准备操作。

第一个事件,postLogReplay() 的触发取决于 WAL 恢复模式(WAL recovery mode)的配置:分布式日志切分(distributed log splitting),或者日志重
放(log replay),由 hbase.master.distributed.log.replay 属性配置。前者运行于一个 region 打开之前,因此会首先触发这个调用。后者打开一个
region ,然后重放编辑日志,在 region 打开事件之后,触发该回调。

在这两种模式中,但取决于哪一种模式是活动的, region server 从 write-ahead-log (WAL) 应用记录。因此按次序调用监听者的 preWALRestore()
或 postWALRestore() 方法。在使用分布式日志切分模式下(distributed log splitting), 这会发生在 pending open 之后,但在 open state 之前。
否则,该调用在 open 事件之后,因为日志被重放。钩入到 WAL 调用可以控制在日志重放过程中应用了什么数据操作(mutation).可以访问编辑日志(
edit record), 可以用来观察应用了什么操作。


● State: open
-------------------------------------------------------------------------------------------------------------------------------------
当一个 region 被部署到一个 region 服务器上并且可以正常工作时,这个 region 被认为处于 open 状态。例如,region 的内存存储可以刷写到磁盘
上,或者在 region 变得太大时可以被拆分。可能的钩子如下:

preFlushScannerOpen(...)
preFlush(...) / postFlush(...)
preCompactSelection(...) / postCompactSelection(...)
preCompactScannerOpen(...)
preCompact(...) / postCompact(...)
preSplit(...)
preSplitBeforePONR(...)
preSplitAfterPONR(...)
postSplit(...)
postCompleteSplit(...) / preRollBackSplit(...) / postRollBackSplit(...)

这些方法名称上看都非常直接:pre 调用在各自的操作之前调用,而 post 调用在操作之后调用。例如,利用 preSplit() 钩子,可以有效地禁止内置
的 region 拆分过程,并手动执行这些操作。有些调用只有 pre 钩子,有些则只有 post 钩子。

flush, compact, 和 split 这三种类型的钩子直接连接到匹配 region 内部处理函数。也有一些更特殊的钩子,作为这三种函数的部分调用。例如,
preFlushScannerOpen() 钩子在 memstore 设置的时候被调用,这只在刷写实际发生之前调用。

类似地,对于压缩(compaction),首先服务器选择包含的文件,该功能被封装到协处理器回调中。之后存储扫描器打开,最后,实际的压缩发生。

对于 split, 有几个回调反应当前的状态,通过一个状态之间特定的 point-of-no-return (PONR) 概念。这发生在 split process 开始之后,但在最后
的动作发生之前。拆分的处理类似于一个内部的事务,当事务要提交时,preSplitBeforePONR() 被调用,而之后 preSplitAfterPONR() 被调用。也有
对最终的 completed 或 rollback 的调用,通知用户 split 事务的结果。


● State: pending close
-------------------------------------------------------------------------------------------------------------------------------------
对于观察者的最后一组钩子是在 region 在进入 pending close 状态时调用的。这组调用在 region 从 open 变为 close 时发生。只是在 region 关闭
之前和之后,执行如下钩子:

preClose(..., boolean abortRequested)
postClose(..., boolean abortRequested)

abortRequested 参数指出一个 region 关闭的原因。通常情况下,region 正常操作关闭,如,由于负载均衡的原因,该 region 移动到一个不同的region
server 上。但也有可能由于某个 region server 出了问题而终止这个 region 以避免影响任何一端的处理结果。当这类情况发生时,所有存储在该服务器
上的 region 都会终止,用户可以通过这个给定的参数判断发生了什么情况。

除此之外,这个类也继承了 start() 和 stop() 方法,允许分配、释放生命周期资源。


3.7.2 处理客户端 API 事件 (Handling Client API Events)
-----------------------------------------------------------------------------------------------------------------------------------------
与生命周期事件相对,所有客户端 API 调用显式地从客户端应用发送到 region 服务器。用户可以钩入到这些调用,在应用之前或之后回调。下表列出可用
的 API 调用和相关的回调方法:

Callbacks for client API functions
+-------------------------------+-------------------------------------------+--------------------------------------------
| API Call | Pre-Hook | Post-Hook
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.put() | prePut(...) | postPut(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.checkAndPut() | preCheckAndPut(...) | postPut(...)
| | preCheckAndPutAfterRowLock(...) | postCheckAndPut(...)
| | prePut(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.get() | preGetOp(...) | postGetOp(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.delete() | preDelete(...) | postDelete(...)
| Table.batch() | prePrepareTimeStampForDeleteVersion(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.checkAndDelete() | preCheckAndDelete(...) | postDelete(...)
| | preCheckAndDeleteAfterRowLock(...) | postCheckAndDelete(...)
| | preDelete(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.mutateRow() | preBatchMutate(...) | postBatchMutate(...)
| | prePut(...)/preGetOp(...) | postPut(...)/postGetOp(...)
| | | postBatchMutateIndispensably()
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.append() | preAppend(...) | postMutationBeforeWAL(...)
| | preAppendAfterRowLock() | postAppend(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.batch() | preBatchMutate(...) | postPut(...)/postGetOp(...)
| | prePut(...)/preGetOp(...)/preDelete(...) | postBatchMutate(...)
| | prePrepareTimeStampForDeleteVersion(...) |
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.checkAndMutate() | preBatchMutate(...) | postBatchMutate(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.getScanner() | preScannerOpen(...) | postInstantiateDeleteTracker(...)
| | preStoreScannerOpen(...) | postScannerOpen(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| ResultScanner.next() | preScannerNext(...) | postScannerFilterRow(...)
| | | postScannerNext(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| ResultScanner.close() | preScannerClose(...) | postScannerClose(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.increment(), | preIncrement(...) | postMutationBeforeWAL(...)
| Table.batch() | preIncrementAfterRowLock(...) | postIncrement(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.incrementColumnValue() | preIncrementColumnValue(...) | postIncrementColumnValue(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.getClosestRowBefore() | preGetClosestRowBefore(...) | postGetClosestRowBefore(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| Table.exists() | preExists(...) | postExists(...)
+-------------------------------+-------------------------------------------+--------------------------------------------
| completebulkload (tool) | preBulkLoadHFile(...) | postBulkLoadHFile(...)
+-------------------------------+-------------------------------------------+--------------------------------------------


上表中列出的事件是有顺序的,如果有多个事件发生,发生的顺序从上至下。斜线 "/" 表示可能发生的事件为多个事件中的一个,具体会是哪个事件,
取决于调用中具体包含的操作。各个事件回调方法的详细信息参考 API 文档或源代码。

示例: Observer collecting invocation statistics

@SuppressWarnings("deprecation") // because of API usage
public class ObserverStatisticsEndpoint
extends ObserverStatisticsProtos.ObserverStatisticsService
implements Coprocessor, CoprocessorService, RegionObserver {

private RegionCoprocessorEnvironment env;
private Map<String, Integer> stats = new LinkedHashMap<>();

// Lifecycle methods
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
...

// Endpoint methods
@Override
public void getStatistics(RpcController controller,
ObserverStatisticsProtos.StatisticsRequest request,
RpcCallback<ObserverStatisticsProtos.StatisticsResponse> done) {

ObserverStatisticsProtos.StatisticsResponse response = null;
try {
ObserverStatisticsProtos.StatisticsResponse.Builder builder =
ObserverStatisticsProtos.StatisticsResponse.newBuilder();
ObserverStatisticsProtos.NameInt32Pair.Builder pair =
ObserverStatisticsProtos.NameInt32Pair.newBuilder();
for (Map.Entry<String, Integer> entry : stats.entrySet()) {
pair.setName(entry.getKey());
pair.setValue(entry.getValue().intValue());
builder.addAttribute(pair.build());
}
response = builder.build();
// optionally clear out stats
if (request.hasClear() && request.getClear()) {
synchronized (stats) {
stats.clear();
}
}
} catch (Exception e) {
ResponseConverter.setControllerException(controller, new IOException(e));
}
done.run(response);
}

/**
* Internal helper to keep track of call counts.
*
* @param call The name of the call.
*/
private void addCallCount(String call) {
synchronized (stats) {
Integer count = stats.get(call);
if (count == null) count = new Integer(1);
else count = new Integer(count + 1);
stats.put(call, count);
}
}

// All Observer callbacks follow here
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) throws IOException {
addCallCount("preOpen");
}

@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> observerContext) {
addCallCount("postOpen");
}
...
}



3.7.3 The RegionCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
提供给实现 RegionObserver 接口的协处理器的环境实例基于 RegionCoprocessorEnvironment 类,该类实现 CoprocessorEnvironment 接口。除了已提供的
方法,RegionCoprocessorEnvironment 还提供了一些特殊的,面向 region 子类的方法,如下表所示:

Specific methods provided by the RegionCoprocessor Environment class
+---------------------------+---------------------------------------------------------------------------------------+
| Method | Description |
+---------------------------+---------------------------------------------------------------------------------------+
| getRegion() | Returns a reference to the region the current observer is associated with. |
+---------------------------+---------------------------------------------------------------------------------------+
| getRegionInfo() | Get information about the region associated with the current coprocessor instance. |
+---------------------------+---------------------------------------------------------------------------------------+
| getRegionServerServices() | Provides access to the shared RegionServerServices instance. |
+---------------------------+---------------------------------------------------------------------------------------+
| getSharedData() | All the shared data between the instances of this coprocessor |
+---------------------------+---------------------------------------------------------------------------------------+

getRegion() 调用可用于获取存本机上的 HRegion 实例的引用,并可以调用 HRegion 类提供的方法。如果需要有关该 region 的信息,可以调用
getRegionInfo() 来获取 HRegionInfo 实例。此类有非常有用的方法可以获取所包含的 key 的访问,region 的名称,以及状态标志:

byte[] getStartKey()
byte[] getEndKey()
byte[] getRegionName()
boolean isSystemTable()
int getReplicaId()
...

另外,用户代码可以访问共享的 region server 服务实例,通过调用 getRegionServerServices() 方法并返回 RegionServerServices 实例。该类提供了
很多高级方法,如下表所示:

Methods provided by the RegionServerServices class
+-------------------------------+-----------------------------------------------------------------------------------------
| Method | Description
+-------------------------------+-----------------------------------------------------------------------------------------
| abort() | Allows aborting the entire server process, shutting down the instance with the given reason.
+-------------------------------+-----------------------------------------------------------------------------------------
| addToOnlineRegions() | Adds a given region to the list of online regions. This is used for internal bookkeeping.
+-------------------------------+-----------------------------------------------------------------------------------------
| getCompactionRequester() | Provides access to the shared CompactionRequestor instance. This can be used to initiate
| | compactions from within the coprocessor.
+-------------------------------+-----------------------------------------------------------------------------------------
| getConfiguration() | Returns the current server configuration.
+-------------------------------+-----------------------------------------------------------------------------------------
| getConnection() | Provides access to the shared connection instance.
+-------------------------------+-----------------------------------------------------------------------------------------
| getCoordinatedStateManager() | Access to the shared state manager, gives access to the TableStateManager, which in turn can
| | be used to check on the state of a table
+-------------------------------+-----------------------------------------------------------------------------------------
| getExecutorService() | Used by the master to schedule system-wide events.
+-------------------------------+-----------------------------------------------------------------------------------------
| getFileSystem() | Used by the master to schedule system-wide events.
+-------------------------------+-----------------------------------------------------------------------------------------
| getFlushRequester() | Provides access to the shared FlushRequester instance. This can be used to initiate memstore flushes.
+-------------------------------+-----------------------------------------------------------------------------------------
| getFromOnlineRegions() | Returns a HRegion instance for a given region, must be hosted by same server
+-------------------------------+-----------------------------------------------------------------------------------------
| getHeapMemoryManager() | Provides access to a manager instance, gives access to heap related information, such as occupancy.
+-------------------------------+-----------------------------------------------------------------------------------------
| getLeases() | Returns the list of leases, as acquired for example by client side scanners.
+-------------------------------+-----------------------------------------------------------------------------------------
| getMetaTableLocator() | The method returns a class providing system table related functionality.
+-------------------------------+-----------------------------------------------------------------------------------------
| getNonceManager() | Gives access to the nonce manager, which is used to generate unique IDs.
+-------------------------------+-----------------------------------------------------------------------------------------
| getOnlineRegions() | Lists all online regions on the current server for a given table.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRecoveringRegions() | Lists all regions that are currently in the process of replaying WAL entries.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRegionServerAccounting() | Provides access to the shared RegionServerAccounting instance. It allows you to check on what
| | the server currently has allocated—for example, the global memstore size.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRegionsInTransitionInRS() | List of regions that are currently in-transition.
+-------------------------------+-----------------------------------------------------------------------------------------
| getRpcServer() | Returns a reference to the low-level RPC implementation instance
+-------------------------------+-----------------------------------------------------------------------------------------
| getServerName() | The server name, which is unique for every region server process.
+-------------------------------+-----------------------------------------------------------------------------------------
| getTableLockManager() | Gives access to the lock manager. Can be used to acquire read and write locks for the entire table.
+-------------------------------+-----------------------------------------------------------------------------------------
| getWAL() | Provides access to the write-ahead log instance.
+-------------------------------+-----------------------------------------------------------------------------------------
| getZooKeeper() | Returns a reference to the ZooKeeper watcher instance.
+-------------------------------+-----------------------------------------------------------------------------------------
| isAborted() | Flag is true when abort() was called previously
+-------------------------------+-----------------------------------------------------------------------------------------
| isStopped() | Returns true when stop() (inherited from Stoppable) was called beforehand
+-------------------------------+-----------------------------------------------------------------------------------------
| isStopping() | Returns true when the region server is stopping.
+-------------------------------+-----------------------------------------------------------------------------------------
| postOpenDeployTasks() | Called by the region server after opening a region, does internal housekeeping work.
+-------------------------------+-----------------------------------------------------------------------------------------
| registerService() | Registers a new custom service. Called when server starts and coprocessors are loaded.
+-------------------------------+-----------------------------------------------------------------------------------------
| removeFromOnlineRegions() | Removes a given region from the internal list of online regions.
+-------------------------------+-----------------------------------------------------------------------------------------
| reportRegionStateTransition() | Triggers a report chain when a state change is needed for a region. Sent to the Master
+-------------------------------+-----------------------------------------------------------------------------------------
| stop() | Stops the server gracefully
+-------------------------------+-----------------------------------------------------------------------------------------


没有必要自己实现 RegionObserver 接口,基于这个接口,可以利用 BaseRegionObserver 类来只实现所需要回调方法。


3.7.4 The BaseRegionObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
这个类可用作观察者类型协处理器的基类。它有 RegionObserver 接口要求的所有方法的空实现,因此默认什么有不做。必须重写所有感兴趣的回调来添加
自己的功能。

示例: Example region observer checking for special get requests

public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");

@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
//Check if the request row key matches a well known one.
if (Bytes.equals(get.getRow(), FIXED_ROW)) {
Put put = new Put(get.getRow());
put.addColumn(FIXED_ROW, FIXED_ROW,
Bytes.toBytes(System.currentTimeMillis()));
CellScanner scanner = put.cellScanner();
scanner.advance();
Cell cell = scanner.current();
results.add(cell);
}
}
}


NOTE
-------------------------------------------------------------------------------------------------------------------------------------
将如下属性添加到 hbase-site.xml 配置文件以启用这个协处理器

<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>coprocessor.RegionObserverExample</value>
</property>




3.8 The MasterObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
讨论 Coprocessor 的第二个子类是为了处理 master server 启动的所有回调方法。它们被划归为数据定义操作(data-manipulation operations), 类似于
关系数据库系统中的 DDL。MasterObserver 类提供如下钩子:

Callbacks for master API functions
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| API Call | Shell Call | Pre-Hook | Post-Hook
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| createTable() | create | preCreateTable(...) | postCreateTable(...)
| | | preCreateTableHandler(...) |
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| deleteTable() | drop | preDeleteTable(...) | postDeleteTableHandler(...),
| deleteTables() | | preDeleteTableHandler(...) | postDeleteTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preModifyTable(...) | postModifyTableHandler(...),
| | | preModifyTableHandler(...) | postModifyTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preAddColumn(...) | postAddColumnHandler(...),
| | | preAddColumnHandler(...) | postAddColumn(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preDeleteColumn(...) | postDeleteColumnHandler(...),
| | | preDeleteColumnHandler(...) | postDeleteColumn(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyTable() | alter | preModifyColumn(...) | postModifyColumnHandler(...),
| | | preModifyColumnHandler(...) | postModifyColumn(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| enableTable() | enable | preEnableTable(...) | postEnableTableHandler(...),
| enableTables() | | preEnableTableHandler(...) | postEnableTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| disableTable() | disable | preDisableTable(...), | postDisableTableHandler(...),
| disableTables() | | preDisableTableHandler(...) | postDisableTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| flush() | flush | preTableFlush(...) | postTableFlush(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| truncateTable() | truncate | preTruncateTable(...) | postTruncateTableHandler(...),
| | | preTruncateTableHandler(...) | postTruncateTable(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| move() | move | preMove(...) | postMove(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| assign() | assign | preAssign(...) | postAssign(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| unassign() | unassign | preUnassign(...) | postUnassign(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| offline() | n/a | preRegionOffline(...) | postRegionOffline(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| balancer() | balancer | preBalance(...) | postBalance(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| setBalancerRunning() | balance_switch | preBalanceSwitch(...) | postBalanceSwitch(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| listTableNames() | list | preGetTableNames(...) | postGetTableNames(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| getTableDescriptors() | list | preGetTableDescriptors(...) | postGetTableDescriptors(...)
| listTables() | | |
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| createNamespace() | create_namespace | preCreateNamespace(...) | postCreateNamespace(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| deleteNamespace() | drop_namespace | preDeleteNamespace(...) | postDeleteNamespace(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| getNamespaceDescriptor() | describe_namespace | preGetNamespaceDescriptor(...) | postGetNamespaceDescriptor(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| listNamespaceDescriptors() | list_namespace | preListNamespaceDescriptors(...) | postListNamespaceDescriptors(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| modifyNamespace() | alter_namespace | preModifyNamespace(...) | postModifyNamespace(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| cloneSnapshot() | clone_snapshot | preCloneSnapshot(...) | postCloneSnapshot(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| deleteSnapshot(), | delete_snapshot | preDeleteSnapshot(...) | postDeleteSnapshot(...)
| deleteSnapshots() | delete_all_snapshot | |
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| restoreSnapshot() | restore_snapshot | preRestoreSnapshot(...) | postRestoreSnapshot(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| snapshot() | snapshot | preSnapshot(...) | postSnapshot(...)
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| shutdown() | n/a | preShutdown(...) | n/a
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| stopMaster() | n/a | preStopMaster(...) | n/a
+-------------------------------+-----------------------+-----------------------------------+-------------------------------
| n/a | n/a | preMasterInitialization(...) | postStartMaster(...)
+-------------------------------+-----------------------+-----------------------------------+------------------------------



3.8.1 The MasterCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
类似于 RegionCoprocessorEnvironment, 与 RegionObserver 协处理器紧紧关联,MasterCoprocessorEnvironment 封装到 MasterObserver 实例中。它也
实现 CoprocessorEnvironment 接口,如此,例如可以在自己的实现中通过 getTable() 调用访问数据。除了 CoprocessorEnvironment 接口提供的方法,也
提供了一些特殊的,面向 master 子类的方法,如下表所示:

Specific method provided by the MasterCoprocessorEnvironment class
+---------------------------+---------------------------------------------------------------------------------------+
| Method | Description |
+---------------------------+---------------------------------------------------------------------------------------+
| getMasterServices() | Provides access to the shared MasterServices instance |
+---------------------------+---------------------------------------------------------------------------------------+

用户代码中可以访问共享的 master services 实例,该服务类公开了很多 Master admin API, 大致分类如下:

table 相关的方法:
createTable(HTableDescriptor, byte[][])
deleteTable(TableName)
modifyTable(TableName, HTableDescriptor)
enableTable(TableName)
disableTable(TableName)
getTableDescriptors()
truncateTable(TableName, boolean)
addColumn(TableName, HColumnDescriptor)
deleteColumn(TableName, byte[])
modifyColumn(TableName, HColumnDescriptor)

namespace 相关的方法:
createNamespace(NamespaceDescriptor)
deleteNamespace(String)
modifyNamespace(NamespaceDescriptor)
getNamespaceDescriptor(String)
listNamespaceDescriptors()
listTableDescriptorsByNamespace(String)
listTableNamesByNamespace(String)

下表列出更特定的方法及其简短描述:

Methods provided by the MasterServices class
+-----------------------------------+-------------------------------------------------------------------------------------
| Method | Description
+-----------------------------------+-------------------------------------------------------------------------------------
| abort() | Allows aborting the entire server process, shutting down the instance with the given reason.
+-----------------------------------+-------------------------------------------------------------------------------------
| checkTableModifiable() | Convenient to check if a table exists and is offline so that it can be altered.
+-----------------------------------+-------------------------------------------------------------------------------------
| dispatchMergingRegions() | Flags two regions to be merged, which is performed on the region servers.
+-----------------------------------+-------------------------------------------------------------------------------------
| getAssignmentManager() | Gives you access to the assignment manager instance. It is responsible for all region
| | assignment operations, such as assign, unassign, balance, and so on
+-----------------------------------+-------------------------------------------------------------------------------------
| getConfiguration() | Returns the current server configuration.
+-----------------------------------+-------------------------------------------------------------------------------------
| getConnection() | Provides access to the shared connection instance.
+-----------------------------------+-------------------------------------------------------------------------------------
| getCoordinatedStateManager() | Access to the shared state manager, gives access to the TableStateManager, which in turn
| | can be used to check on the state of a table
+-----------------------------------+-------------------------------------------------------------------------------------
| getExecutorService() | Used by the master to schedule system-wide events
+-----------------------------------+-------------------------------------------------------------------------------------
| getMasterCoprocessorHost() | Returns the enclosing host instance.
+-----------------------------------+-------------------------------------------------------------------------------------
| getMasterFileSystem() | Provides you with an abstraction layer for all filesystem-related operations the master is
| | involved in—for example, creating directories for table files and logfiles.
+-----------------------------------+-------------------------------------------------------------------------------------
| getMetaTableLocator() | The method returns a class providing system table related functionality.
+-----------------------------------+-------------------------------------------------------------------------------------
| getServerManager() | Returns the server manager instance. With it you have access to the list of servers, live or
| | considered dead, and more
+-----------------------------------+-------------------------------------------------------------------------------------
| getServerName() | The server name, which is unique for every region server process
+-----------------------------------+-------------------------------------------------------------------------------------
| getTableLockManager() | Gives access to the lock manager. Can be used to acquire read and write locks for the entire table.
+-----------------------------------+-------------------------------------------------------------------------------------
| getZooKeeper() | Returns a reference to the ZooKeeper watcher instance
+-----------------------------------+-------------------------------------------------------------------------------------
| isAborted() | Flag is true when abort() was called previously
+-----------------------------------+-------------------------------------------------------------------------------------
| isInitialized() | After the server process is operational, this call will return true.
+-----------------------------------+-------------------------------------------------------------------------------------
| isServerShutdownHandlerEnabled() | When an optional shutdown handler was set, this check returns true.
+-----------------------------------+-------------------------------------------------------------------------------------
| isStopped() | Returns true when stop() (inherited from Stoppable) was called beforehand.
+-----------------------------------+-------------------------------------------------------------------------------------
| registerService() | Registers a new custom service. Called when server starts and coprocessors are loaded
+-----------------------------------+-------------------------------------------------------------------------------------
| stop() | Stops the server gracefully
+-----------------------------------+-------------------------------------------------------------------------------------


3.8.2 The BaseMasterObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
用户可以凭自己的能力直接实现 MasterObserver 接口,或继承 BaseMasterObserver 类来实现。BaseMasterObserver 实现了 MasterObserver 接口,只是
所有的回调都为空实现。如果直接使用这个类,不会有任何实际的交互反应。

通过重写适当的事件回调方法,可以添加其它功能。可以将自己的代码钩入到 pre 或 post 回调方法中。下面示例利用 post 钩子在一个 table 创建之后
执行额外的任务。

示例: Example master observer that creates a separate directory on the file system when a table is created.

public class MasterObserverExample extends BaseMasterObserver {

@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, HTableDescriptor desc, HRegionInfo[] regions)
throws IOException {

TableName tableName = desc.getTableName();
MasterServices services = ctx.getEnvironment().getMasterServices();
MasterFileSystem masterFileSystem = services.getMasterFileSystem();
FileSystem fileSystem = masterFileSystem.getFileSystem();
Path blobPath = new Path(tableName.getQualifierAsString() + "-
blobs");
fileSystem.mkdirs(blobPath);
}
}

NOTE
-------------------------------------------------------------------------------------------------------------------------------------
将如下属性添加到 hbase-site.xml 配置文件以在 master 进程启动时载入这个协处理器
<property>
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property

修改后,重启 HBase 以使配置生效。

一旦激活了这个协处理器,它会在关注的事件上监听,并自动触发用户代码的执行。

可以使用如下 shell 命令触发事件:
hbase(main):001:0> create 'testtable3', 'colfam1'
0 row(s) in 0.6740 seconds

创建了一个表,随后会调用协处理器的 postCreateTable() 方法,通过 Hadoop 命令行工具验证结果:

$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x - larsgeorge supergroup 0 ... testtable3-
blobs


3.8.3 The BaseMasterAndRegionObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
HBase 还提供了另外一个相关的基类,BaseMasterAndRegionObserver. 它联合了 BaseRegionObserver 类 和 MasterObserver 接口。

public abstract class BaseMasterAndRegionObserver
extends BaseRegionObserver implements MasterObserver {
...
}

在效果上,它像是组合了 BaseRegionObserver 和 BaseRegionObserver 为一个类。由于这个类提供了一个 region server 和 master server 实现,因此
只用于运行在 HBase Master 上。这对将系统表直接放在 master 上有用。


3.9 The RegionServerObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
这个类是 region 服务器级别的,它公开了与服务器功能相关联的定义良好的钩子方法,也就是说,它们是跨多个 region 和 table 的。钩子方法如下:


● postCreateReplicationEndPoint(...)
-------------------------------------------------------------------------------------------------------------------------------------
在服务器创建了一个 replication endpoint (不要与 coprocessor endpoint 混淆)之后调用。


● preMerge(...), postMerge(...)
-------------------------------------------------------------------------------------------------------------------------------------
两个 region 合并时调用


● preMergeCommit(...), postMergeCommit(...)
-------------------------------------------------------------------------------------------------------------------------------------
与上面的相同,但更小的作用域。在 preMerge() 之后和 postMerge() 之前调用。


● preRollBackMerge(...), postRollBackMerge(...)
-------------------------------------------------------------------------------------------------------------------------------------
这两个方法在一个 region 合并失败,并且合并事务回滚时被调用。


● preReplicateLogEntries(...), postReplicateLogEntries(...)
-------------------------------------------------------------------------------------------------------------------------------------
关联到 WAL 项重放过程,可以对每一个日志项进行特殊的处理。


● preRollWALWriterRequest(...), postRollWALWriterRequest(...)
-------------------------------------------------------------------------------------------------------------------------------------
关联到 WAL 文件回滚,基于文件大小,时间,或手工请求时调用



● preStopRegionServer(...)
-------------------------------------------------------------------------------------------------------------------------------------
这是个 pre-only 的钩子,当继承自 Stoppable 接口的 stop() 方法调用时被调用。



3.9.1 The RegionServerCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
也实现 CoprocessorEnvironment 接口,因此可以在自己的实现中访问 getTable() 调用以访问数据。除 CoprocessorEnvironment 提供的方法,也提供了
面向 server 的方法:

Specific method provided by the RegionServerCoprocessorEnvironment class
+-----------------------------------+-------------------------------------------------------------------------------------
| Method | Description
+-----------------------------------+-------------------------------------------------------------------------------------
| getRegionServerServices() | Provides access to the shared RegionServerServices instance.
+-----------------------------------+-------------------------------------------------------------------------------------


3.9.2 The BaseRegionServerObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
RegionServerObserver 接口的空实现,继承该类,可以将精力集中在真正需要处理的问题上,只重写必要的方法,简化实现。




3.10 The WALObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
与 write-ahead log(WAL for short) 相关的观察者类。它提供了易于处理的回调,如下:

preWALWrite(...), postWALWrite(...)

封装了正在写入到 WAL 的 log entry, 允许访问全部编辑记录。因为可以在这两个方法上接收到整个日志记录,因此用户代码可以影响写入日志的内容。
例如,在一些高级应用场景,可以向日志编辑记录添加额外的 cell, 这样在日志重放时,这个额外的 cell 可以帮助微调数据重建过程。


3.10.1 The WALCoprocessorEnvironment Class
-----------------------------------------------------------------------------------------------------------------------------------------
作为回调的一部分提供的环境,这里是 WALCoprocessorEnvironment 实例,它也扩展自 CoprocessorEnvironment,因此也可以在自己的实现代码中通过
getTable() 方法访问数据。

Specific method provided by the WALCoprocessorEnvironment class
+-----------------------------------+-------------------------------------------------------------------------------------
| Method | Description
+-----------------------------------+-------------------------------------------------------------------------------------
| getWAL() | Provides access to the shared WAL instance.
+-----------------------------------+-------------------------------------------------------------------------------------

利用 WAL 的引用,可以滚动当前的写入器,换句话说,关闭当前的日志文件,并创建一个新的日志文件。也可以调用 sync() 方法来强制编辑
日志写入到持久层。下面是 WAL 接口提供的方法:

void registerWALActionsListener(final WALActionsListener listener)
boolean unregisterWALActionsListener(final WALActionsListener listener)
byte[][] rollWriter() throws FailedLogCloseException, IOException
byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException
void shutdown() throws IOException
void close() throws IOException
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)throws IOException
void sync() throws IOException
void sync(long txid) throws IOException
boolean startCacheFlush(final byte[] encodedRegionName)
void completeCacheFlush(final byte[] encodedRegionName)
void abortCacheFlush(byte[] encodedRegionName)
WALCoprocessorHost getCoprocessorHost()
long getEarliestMemstoreSeqNum(byte[] encodedRegionName)

这些都是非常底层的方法,详细信息参考 API 文档


3.10.2 The BaseWALObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
BaseWALObserver 类实现 WALObserver 接口,用户可以继承这个类,或者直接实现接口。




3.11 The BulkLoadObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
这个观察者类用于在 bulk loading 操作期间,由 HBase 提供的 completebulkload 工具触发。包含在服务器 JAR 文件中。使用 Hadoop JAR 支持,可以
查看工具列表:

bin/hadoop jar /usr/local/hbase-1.0.0-bin/lib/hbaseserver-1.0.0.jar
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
verifyrep: Compare the data from tables in two different clusters.
WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.

一旦 completebulkload 工具运行,它会尝试将所有分阶段的 bulk 载入。在此操作期间,触发可用的回调:


prePrepareBulkLoad(...)
-------------------------------------------------------------------------------------------------------------------------------------
在 bulk load 操作发生之前调用.


preCleanupBulkLoad(...)
-------------------------------------------------------------------------------------------------------------------------------------
当 bulk load 完成并且执行清除任务时调用。



3.12 The EndPointObserver Class
-----------------------------------------------------------------------------------------------------------------------------------------
这个类没有自己的环境,使用共享的 RegionCoprocessorEnvironment, 这很容易理解,因为 endpoint 运行在一个 region 的 context 中。

可用的回调方法如下:

preEndpointInvocation(...), postEndpointInvocation(...)

当一个 endpoint 方法从客户端调用时触发,这些回调封装了客户端的执行。

客户端可以替换(对于 pre 钩子) 或修改 (对于 post 钩子)给定的 Message 实例来修改 endpoint 方法的输出结果。如果在 pre 钩子调用期间发生了异常,
那么服务器端的调用完全终止。

(本篇完)

系列目录:

HBase 协处理器 (一)

HBase 协处理器 (二)

参考:

《HBase - The Definitive Guide - 2nd Edition》Early release —— 2015.7 Lars George


HBase 协处理器 (二) https://www.cppentry.com/bencandy.php?fid=118&id=208205

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hbase1.2数据导入2.0 下一篇大数据系列之数据库Hbase知识整理..