设为首页 加入收藏

TOP

HBase1.0.0源码分析之Client启动连接流程(一)
2015-11-21 01:45:38 来源: 作者: 【 】 浏览:2
Tags:HBase1.0.0 源码 分析 Client 启动 连接 流程
我们知道在使用HBase的过程中首要的是和服务器端取得链接,那么客户端是如何去链接的,它是怎么找到master和regionserver的? 参与该过程中的主要 组件又有哪些?这些组件之间是如何协同工作的呢? 今天就让我们来一起解析.
HBase的连接代码很简单,如下:
try (Connection connection = ConnectionFactory.createConnection(conf))
这里用到了工厂模式进行Connection实例的创建,需要传入的是配置参数管理类Configuration,在创建中首先需要把用户信息添加进去:
 if (user == null) {
      UserProvider provider = UserProvider.instantiate(conf);
      user = provider.getCurrent();
    }

    return createConnection(conf, false, pool, user);

 String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
      ConnectionManager.HConnectionImplementation.class.getName());
    Class clazz = null;
    try {
      clazz = Class.forName(className);
    } catch (ClassNotFoundException e) {
      throw new IOException(e);
    }
    try {
      // Default HCM#HCI is not accessible; make it so before invoking.
      Constructor constructor =
        clazz.getDeclaredConstructor(Configuration.class,
          boolean.class, ExecutorService.class, User.class);
      constructor.setAccessible(true);
      return (Connection) constructor.newInstance(conf, managed, pool, user);

这里使用了反射技术进行类对象的构造,从代码中我们看到实际是调用了HConncetionImplementation的构造函数,这些类之间的相互关系如下图所示:

\

从途中可以看出,HConnectionImplementation是实际的Connction实现类,接下来我们去看看该类的实例化过程:

        HConnectionImplementation(Configuration conf, boolean managed,
                                  ExecutorService pool, User user) throws IOException {
            this(conf);
            this.user = user;
            this.batchPool = pool;
            this.managed = managed;
            this.registry = setupRegistry();
            retrieveClusterId();

            this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
            this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);

            // Do we publish the status?
            boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
                    HConstants.STATUS_PUBLISHED_DEFAULT);
            Class listenerClass =
                    conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
                            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
                            ClusterStatusListener.Listener.class);
            if (shouldListen) {
                if (listenerClass == null) {
                    LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
                            ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
                } else {
                    clusterStatusListener = new ClusterStatusListener(
                            new ClusterStatusListener.DeadServerHandler() {
                                @Override
                                public void newDead(ServerName sn) {
                                    clearCaches(sn);
                                    rpcClient.cancelConnections(sn);
                                }
                            }, conf, listenerClass);
                }
            }
        }

好吧这看起来有点小复杂,它首先调用了另一个构造类
        protected HConnectionImplementation(Configuration conf) {
            this.conf = conf;
            this.tableConfig = new TableConfiguration(conf);
            this.closed = false;
            this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
                    HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
            this.numTries = tableConfig.getRetriesNumber();
            this.rpcTimeout = conf.getInt(
                    HConstants.HBASE_RPC_TIMEOUT_KEY,
                    HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
            if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
                synchronized (nonceGeneratorCreateLock) {
                    if (ConnectionManager.nonceGenerator == null) {
                        ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
                    }
                    this.nonceGenerator = ConnectionManager.nonceGenerator;
                }
            } else {
                this.nonceGenerator = new NoNonceGenerator();
            }
            stats = ServerStatisticTracker.create(conf);
            this.asyncProcess = createAsyncProcess(this.conf);
            this.interceptor = (new Retr
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇HBase1.0.0源码分析之请求处理流.. 下一篇分布式缓存GemFire架构介绍

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: