基于google protobuf的RPC engine,必须在服务器端和客户端都完成了初始化之后,才能开始通信。在《Hadoop 基于protobuf 的RPC的服务器端实现原理》 这篇博文中,我介绍了RPC 的服务器端实现,那么,客户端是如何基于预先定义的protobuf协议,来与远程的基于相同的protobuf协议的服务端进行通信的呢?
比如,NodeManger与远程的ResourceManager进行RPC通信,它们的通信基于ResourceTracker这个RPC协议,协议定义在ResourceTracker.proto文件中:
service ResourceTrackerService {
rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
}
协议中定义了两个通信接口,registerNodeManager
和nodeHeartbeat
。registerNodeManager
负责在节点启的NodeManager启动的时候向ResourceManger注册自己,而nodeHeartbeat
是通过定时心跳的方式,不断向ResourceManager报告自己的存在,并将自己的状态汇报给ResourceManager。
在Yarn的基于Master/Slave的设计模式中,register
思想是最核心的 设计思想。NodeManager被ResourceManager管理,那么NodeManager在启动的时候必须向ResourceManager注册,RPCEngine想要生效,Engine在初始化的时候也必须向ResourceManager注册。我认为这种设计思想的根本目的,是将主动权交给用户(Slave)而不是管理者(Master),这样,将Master从繁杂的管理工作中解脱出来,Master不需要关心、不需要轮训NodeManager什么时候来,什么时候启动,而是让NodeManager在启动的时候主动告知即可。
那么,这个客户端协议是怎么进行初始化并向远程的ResourceTracker发送消息的呢?既然NodeManager是该协议的客户端,我们从NodeManager代码进入,来看看初始化以及初始化以后基于协议进行通信的客户端过程。
Yarn代码设计的另外一个重要特点就是功能服务化,无论是NodeManager、ResourceManager还是MapReduce的ApplicationMaster(MRAppMaster),都抽象为服务,服务之间功能独立,服务的运行 被抽象为初始化、启动、运行和停止等基本过程,让整个代码逻辑非常清晰、封装性变得非常好。
在NodeManager的serviceInit()
方法中,我们看到:
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
protected NodeStatusUpdater createNodeStatusUpdater (Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics);
}
创建了一个运行时类型为NodeStatusUpdaterImpl
的状态更新器,其实,这个NodeStatusUpdater也是一个service
,启动ResourceTracker协议的客户端,就是在NodeStatusUpdaterImpl.serviceStart()
中进行:
@Override
protected void serviceStart () throws Exception {
try {
this .resourceTracker = getRMClient();
registerWithRM();
} catch (Exception e) {
}
}
跟踪代码,到ServerRMProxy.createRMProxy()
:
/**
* Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as
* well.
* 对于ResourceTracker协议来说,这里的参数protocol就是是ResourceTracker.class, instanse参数是ServerRMProxy
*/
@Private
protected static <T> T createRMProxy (final Configuration configuration,
final Class<T> protocol, RMProxy instance) throws IOException {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
(YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy = createRetryPolicy(conf);
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider =
instance.createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
}
}
在这里我们开始接触到proxy
, 如果大家对IPC(Inter-Process Communication,进程间通信)或者RMI(Remote Method Invocation,远程方法调用)不是很熟悉,也许对proxy的理解产生偏差。在这里,proxy指的就是调用者,即客户端。由于在进程间通信或者远程方法调用的时候,调用者只需要调用方法,不需要关心方法是在本地还是远程执行,因此存在一个代理者(即proxy,在java RMI中,也叫做stub程序),来负责将本地客户端的调用通过TCP等网络协议在远程服务器端进行调用,然后取回调用结果提供给调用者。这就是代理的含义。
是否开启HA模式与本文讨论的话题无关,因此我们选取开启HA模式的分支。继续跟踪代码,看看基于ResourceTracker协议的RPC 客户端是怎么创建的。跟踪·RMProxy.createRMFailoverProxyProvider()
方法:
/**
* Helper method to create FailoverProxyProvider.
* 同样,
*/
private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider (
Configuration conf, Class<T> protocol) {
Class< extends RMFailoverProxyProvider<T>> defaultProviderClass;
try {
defaultProviderClass = (Class< extends RMFailoverProxyProvider<T>>)
Class.forName( YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
} catch (Exception e) {
}
RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
defaultProviderClass, RMFailoverProxyProvider.class), conf);
provider.init(conf, (RMProxy<T>) this , protocol);
return provider;
}
然后,进入ConfiguredRMFailoverProxyProvider.init()
方法:
public void init (Configuration configuration, RMProxy<T> rmProxy, Class<T> protocol) {
this .rmProxy = rmProxy;
this .protocol = protocol;
this .rmProxy.checkAllowedProtocols(this .protocol);
}
由此可见,ConfiguredRMFailoverProxyProvider对我们的通信协议进行了HA的封装,在init
方法中,设置了它所代理的协议名称(ResourceTracker)和这个协议的代理对象RMProxy;在HA环境下,客户端只需要直接使用ConfiguredRMFailoverProxyProvider
给我们提供的代理对象,而不需要关心这个代理对象到底是指向了哪一个ResourceManager,这就是ConfiguredRMFailoverProxyProvider
的职责,负责隐藏HA环境下的FailOver细节。
再回到上面提到的代码片段RMProxy.createRMProxy
:
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider = instance .createRMFailoverProxyProvider(conf, protocol);
return (T) RetryProxy.create(protocol, provider, retryPolicy);
} else {
}
RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);
负责创建和初始化ResourceTracker协议在HA环境下的代理ConfiguredRMFailoverProxyProvider
,那么,ConfiguredRMFailoverProxyProvider
是怎么创建真正的RPC客户端的呢?
我们继续跟踪下一行代码(T) RetryProxy.create(protocol, provider, retryPolicy);
,此时,protocol是ResourceTracker.class,provider是ConfiguredRMFailoverProxyProvider
:
public static <T> Object create (Class<T> iface,
FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<>[] { iface },
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}
可以看到,ConfiguredRMFailoverProxyProvider通过java 动态代理来代理了ResourceTracker协议里面方法的执行。熟悉java动态代理的都会明白,每一个动态代理proxy都需要有继承 java.lang.reflect.InvocationHandler
并实现其invoke()
方法,用来代替被代理类的执行,这里,这个InvocationHandler
就是RetryInvocationHandler。ConfiguredRMFailoverProxyProvider底层真正的RPC(已经说过,ConfiguredRMFailoverProxyProvider就是对真正的RPC封装了一层HA特性),就是RetryInvocationHandler来实现的,其实是在RetryInvocationHandler的构造方法里面进行的:
protected RetryInvocationHandler (FailoverProxyProvider<T> proxyProvider,
RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) {
this .currentProxy = proxyProvider.getProxy();
}
@Override
public synchronized ProxyInfo<T> getProxy () {
String rmId = rmServiceIds[currentProxyIndex];
T current = proxies.get(rmId);
if (current == null ) {
current = getProxyInternal();
proxies.put(rmId, current);
}
return new ProxyInfo<T>(current, rmId);
}
接着往下跟踪:
/**
* 负责创建客户端代理对象
* @return
*/
private T getProxyInternal () {
try {
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
return RMProxy.getProxy(conf, protocol, rmAddress);
} catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager " +
rmServiceIds[currentProxyIndex], ioe);
return null ;
}
创建远程ResourceManager服务器端的地址对象,即ResourceTracker协议的服务器端地址信息
@InterfaceAudience .Private
@Override
protected InetSocketAddress getRMAddress (YarnConfiguration conf,
Class<> protocol) {
if (protocol == ResourceTracker.class) {
return conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
} else {
}
}
显然,getRMAddress()
方法就是通过读取配置文件来创建了一个InetSocketAddress
对象,然后,真正底层创建proxy的时刻到来,来看RMProxy.getProxy()
:
* Get a proxy to the RM at the specified address. To be used to create a
* RetryProxy.
* 对于ResourceTracker协议来说,这里的参数protocol就是ResourceTracker.class
*/
@Private
static <T> T getProxy(final Configuration conf, final Class<T> protocol, final InetSocketAddress rmAddress)
throws IOException {
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<T>() {
@Override
public T run () {
return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
}
});
}
YarnRPC是一个抽象类(Abstract Class),是Yarn对Hadoop RPC 的封装,基于历史原因和版本升级迭代,Hadoop RPC有基于多种序列化方式的RPC协议,但是由于Yarn是Hadoop 2.0之后才有的组件,是很新的component, 因此Yarn所有的RPC调用都是基于google protobuf序列化方式的RPC进行的实现。
我们一起来看YarnRPC的类图:
YarnRPC.create()
public static YarnRPC create (Configuration conf) {
LOG.debug("Creating YarnRPC for " +
conf.get(YarnConfiguration.IPC_RPC_IMPL));
String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
if (clazzName == null ) {
clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;
}
try {
return (YarnRPC) Class.forName(clazzName).newInstance();
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
YarnRPC这个抽象类的实际实现类的名称是通过Yarn配置文件读取,默认是org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
这个类,
因此YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
实际上调用了HadoopYarnProtoRPC.getProxy()
方法:
public Object getProxy (Class protocol, InetSocketAddress addr, Configuration conf) {
LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1 , addr, conf);
}
进入RpcClientFactoryPBImpl.getClient()
:
public Object getClient (Class<> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf) {
Constructor<> constructor = cache.get(protocol);
if (constructor == null ) {
Class<> pbClazz = null ;
try {
pbClazz = localConf.getClassByName(getPBImplClassName(protocol));
} catch (ClassNotFoundException e) {
}
try {
constructor = pbClazz.getConstructor(Long.TYPE, InetSocketAddress.class, Configuration.class);
constructor.setAccessible(true );
cache.putIfAbsent(protocol, constructor);
} catch (NoSuchMethodException e) {
}
}
try {
Object retObject = constructor.newInstance(clientVersion, addr, conf);
return retObject;
} catch (InvocationTargetException e) {
}
}
ResourceTrackerPBClientImpl就是对ResourceTracker协议的最下层代理,来看ResourceTrackerPBClientImpl的构造函数:
public ResourceTrackerPBClientImpl (long clientVersion, InetSocketAddress addr, Configuration conf)
throws IOException {
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
proxy = (ResourceTrackerPB) RPC.getProxy(ResourceTrackerPB.class, clientVersion, addr, conf);
}
在这里,我们再次看到了hadoop中的注册 思想。我们的ResourceTrackerPBClientImpl协议要想使用,必须向对应的RPC Engine注册自己。所有基于protobuf协议的RPC都必须向ProtobufRpcEngine进行注册,注册完成以后,创建底层的客户端代理。
终于,经过繁杂但是设计良好的Protobuf RPC的初始化,我们终于拿到了ResourceTracker协议的客户端实现类。此后,ResourceTracker协议的客户端,我们的 NodeManager,就可以根据协议的定义,来进行协议中的registerNodeManager()
方法的调用。我们跟踪一下这个过程,试图搞清楚客户端在调用这个方法的时候,是如何不知不觉通过RPC变成了服务器端的调用的。
registerNodeManager()
是由NodeManager发起的,NodeManager实际上是委托NodeStatusUpdaterImpl
来与服务器端的ResourceManager进行沟通,看 NodeStatusUpdaterImpl.registerNodeManager
:
@Override
public RegisterNodeManagerResponse registerNodeManager (
RegisterNodeManagerRequest request) throws YarnException,
IOException {
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
try {
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null , requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null ;
}
}
这个proxy对象,是 ResourceTrackerPBClientImpl构造函数执行的时候创建的:
由于Yarn RPC使用Protobuf ,因此RPC.getProxy
实际上调用的是ProtobufRpcEngine().getProxy()
方法:
@Override
@SuppressWarnings ("unchecked" )
public <T> ProtocolProxy<T> getProxy (Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false );
}
很明显,这里是通过java动态代理,来对ResourceTrackerPBClientImpl
的方法进行代理执行。再次重复上面关于java 动态代理的解释,所有java动态代理都必须实现java.lang.reflect.InvocationHandler
接口,实现其invoke()
方法,用来代替被代理类的执行,对于ProtobufRpcEngine
,这个InvocationHandler
就是ProtobufRpcEngine.Invoker
。我们看ProtobufRpcEngine.Invoker
是怎么代理这个客户端的registerNodeManager()方法的执行的:
@Override
public Object invoke (Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0 ;
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
Message theRequest = (Message) args[1 ];
final RpcResponseWrapper val;
try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {
} finally {
if (traceScope != null ) traceScope.close();
}
Message prototype = null ;
try {
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.theResponseRead).build();
} catch (Throwable e) {
throw new ServiceException(e);
}
return returnMessage;
}
可以看到,ProtobufRpcEngine.Invokder.invoke()方法做的工作,就是提取客户端请求的方法以及方法的参数,将这些信息发送给远程服务器。远程服务器再通过解析,提取出方法和方法参数,在服务器端本地执行对应的代码,比如,服务器端从客户端请求中提取了方法名称为registerNodeManager()以及参数(包含了节点信息等等),会将节点信息进行注册和管理,然后返回注册成功信息。
在HA环境下,通过一层层代理封装,Yarn实现了HA环境下的ResourceManager协议客户端,ResourceTrackerPBClientImpl封装了该协议的客户端实现,属于下层代理,通过这个下层动态代理,将客户端对应方法的调用,转换成字节码信息发送给远端,而ConfiguredRMFailoverProxyProvider
也是通过动态代理,在ResourceTrackerPBClientImpl的上层进行了封装,以实现High Availability特性。在HA环境下,NodeManager作为ResourceTracker客户端,从ConfiguredRMFailoverProxyProvider的上层代理往下调用,到达ResourceTrackerPBClientImpl下层代理,然后ResourceTrackerPBClientImpl通过动态代理,将请求信息发送到RPC Server,实现了该协议的一次调用。