转自:http://www.cnblogs.com/mumuxinfei/p/3823266.html
具体参考: 官方用户手册和开发指南
http://flume.apache.org/FlumeDeveloperGuide.html
*) 定位和简单例子
1). Flume-ng-sdk是用于编写往flume agent发送数据的client sdk
2). 简单示例
RpcClient client = null;
try {
client = RpcClientFactory.getDefaultInstance("127.0.0.1",41414);
Event event = EventBuilder.withBody("hello flume", Charset.forName("UTF-8"));
client.append(event);
}catch (EventDeliveryException e) {
e.printStackTrace();
}finally {
if ( client != null ) {
client.close();
}
}
*) Event设计和类层次结构
1. Event类设计
在Flume中Event是个接口类
1
2
3
4
5
6
public interface Event {
public Map<String, String> getHeaders();
public void setHeaders(Map<String, String> headers);
public byte[] getBody();
public void setBody(byte[] body);
}
由代码可得, Event由Header集合和消息负载两部分构成.
2. Builder设计模式
在org.apache.flume.event下, 有两个Event的具体实现类: SimpleEvent, JSonEvent.
EventBuilder类顾名思义, 采用Builder的方式来组装对象的成员, 并产生最终的对象.
public class EventBuilder {
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();
if(body == null) {
body = new byte[0];
}
event.setBody(body);
if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}
return event;
}
public static Event withBody(byte[] body) {
return withBody(body,null);
}
public static Event withBody(String body, Charset charset,
Map<String, String> headers) {
return withBody(body.getBytes(charset), headers);
}
public static Event withBody(String body, Charset charset) {
return withBody(body, charset, null);
}
}
java的访问控制符: public/default/protected/private, default表示同package可见
不过另人意外的是, 其对应的SimpleEvent的构造函数的修饰符是public, 即不是default, 也不是protected, 这点让EventBuilder的引入有些失败.把Builder模式, 用到极致的是Google Protocol Buffer(java), 其每个PB对象, 都是用相应的Builder类来组装和生成. 采用这种Builder模式的用途是, 把一个对象元素的修改和读取彻底分离, 使得一个PB对象,从诞生后就是一个immutable对象, 只能读取其属性信息, 而不能修改其属性.
*) RpcClient设计和类层次结构
1. RpcClient的接口定义:
public interface RpcClient {
public int getBatchSize();
public void append(Event event) throws EventDeliveryException;
public void appendBatch(List<Event> events) throws EventDeliveryException;
public boolean isActive();
public void close()throws FlumeException;
}
2. AbstractRpcClient的抽象类定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractRpcClientimplements RpcClient {
protected int batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
protected long connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
protected long requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
@Override
public int getBatchSize(){
return batchSize;
}
protected abstract void configure(Properties properties) throws FlumeException;
}
新增了一些常量定义, 和新的抽象函数configure(Properties prop);
3. RpcClient工厂类的使用
RpcClientFactory的定义
public class RpcClientFactory {
public static RpcClient getInstance(Properties properties) throws FlumeException {
// 1). 获取具体rpcclient的类型信息
properties.getProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE);
// 2). 利用反射,获取类的class
Class<extends AbstractRpcClient> clazz = (Class< extends AbstractRpcClient>) Class.forName(...);
// 3). 产生类对象
RpcClient client = clazz.newInstance();
// 4). 进行具体rpcclient实例的配置初始化
client.configure(properties);
// 5). 返回对象
return client;
}
}
RpcClientFactory借助静态方法getInstance, 其依据Properties里的相应key/value来, 来产生不同的对象实例, 配置不同的属性. 同时RpcClient的具体实例, 其构造方法的访问限定符都是protected, 这一点做的, 比之前EventBuilder设计和实现要规范和清晰.
clazz = Class.forName(...);
client = class.newInstance();
client.configure(...);
是种非常好的实践代码, 把面向对象的多态性用到极致
4. 具体的RpcClient类的实现
其SDK提供了两大类, 具体的实现类ThriftRpcClient和AvroRpcClient
4.1. 对ThriftRpcClient的解读
4.1.1 thrift idl的定义
idl文件(src/main/thrift/flume.thrift)的定义
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}
分别对应源码包org.apache.flume.thrift下
Status, ThriftFlumeEvent, ThriftSourceProtocol类
4.1.2 ThriftRpcClient的实现
ThriftRpcClient并不是简单对ThriftSourceProtocol的客户端的简单封装
1
2
3
4
5
6
|
public class ThriftRpcClient extends AbstractRpcClient
{
private ConnectionPoolManager
connectionManager;
private final ExecutorService
callTimeoutPool;
private final AtomicLong
threadCounter;
}
|
评注: 粗略观察其类成员, 其借助线程池(ExecutorService)和连接池(ConnectionManager)管理, 来实现RpcClient的发送接口, 这样append(), appendBatch()的接口都是线程安全的, 该客户端的实例能用于多线程并发使用.
AvroRpcClient代码结构差不多, 先一笔带过.
5. 两个重要的实现类
FailOverRpcClient的源码解析:
这边采用装饰模式(Decorator Pattern), FailOverRpcClient继承自RpcClient, 同时又拥有实际的RpcClient实例, 只是在实际RpcClient基础上, 添加了失败后重试的能力.
FailOver是失败后重试的机制, 通常借助带尝试次数的重试来实现
其append(Event e)方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
int tries
= 0 ;
while (tries
< maxTries) {
try {
tries++;
localClient
= getClient();
localClient.append(event);
return ;
} catch (EventDeliveryException
e) {
localClient.close();
localClient
= null ;
} catch (Exception
e2) {
throw new EventDeliveryException(
"Failed
to send event. Exception follows: " ,
e2);
}
}
|
这段代码采用相对简单的.
getNextClient()的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
for ( int count
= lastCheckedhost + 1 ;
count < limit; count++) {
HostInfo
hostInfo = hosts.get(count);
try {
setDefaultProperties(hostInfo,
props);
localClient
= RpcClientFactory.getInstance(props);
lastCheckedhost
= count;
return localClient;
} catch (FlumeException
e) {
logger.info( "Could
not connect to " +
hostInfo, e);
continue ;
}
}
for ( int count
= 0 ;
count <= lastCheckedhost; count++) {
HostInfo
hostInfo = hosts.get(count);
try {
setDefaultProperties(hostInfo,
props);
localClient
= RpcClientFactory.getInstance(props);
lastCheckedhost
= count;
return localClient;
} catch (FlumeException
e) {
logger.info( "Could
not connect to " +
hostInfo, e);
continue ;
}
}
|
HostInfo封装了一个远端服务的ip地址
FailOver简单的轮询了各个服务地址.
LoadBalancingRpcClient的源码解析:
LoadBalancingRpcClient顾名思义, 采用负载均衡的策略来实现, 其还是采用遍历(轮询/随机)+反馈的机制, 来动态的调整服务列表的候选顺序.
在append(Event)方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
Iterator<HostInfo>
it = selector.createHostIterator();
while (it.hasNext())
{
HostInfo
host = it.next();
try {
RpcClient
client = getClient(host);
client.append(event);
eventSent
= true ;
break ;
} catch (Exception
ex) {
selector.informFailure(host);
LOGGER.warn( "Failed
to send event to host " +
host, ex);
}
}
if (!eventSent)
{
throw new EventDeliveryException( "Unable
to send event to any host" );
}
|
selector.createHostIterator() 创建当前服务候选列表的一个快照, 同时递进一个轮询单元.
selector.informFailure(host) 是对失败的服务进行降级处理
而HostSelector接口定义如下:
1
2
3
4
5
|
public interface HostSelector
{
void setHosts(List<HostInfo>
hosts);
Iterator<HostInfo>
createHostIterator();
void informFailure(HostInfo
failedHost);
}
|
其具体实现类
#). RoundRobinHostSelector, 借助轮询的方式来实现
#). RandomOrderHostSelector, 借助随机的方式来实现
这两个类, 都是借助OrderSelector<T>的实现类来实现, OrderSelector封装了对错误服务机器列表的屏蔽策略
该屏蔽策略如下所示:
失败一次, 设置一个恢复时间点, 未到该恢复时间点, 则不允许获取该机器ip/port
同时为了惩罚多次失败, 减少获取该服务机器的ip/port, 采用1000 * (1 << sequentialFails), 连续失败次数, 其恢复时间的间隔要加大.
*) Properties的属性配置
基本的属性配置
1
2
3
4
5
6
|
client.type
= default ( for avro)
or thrift ( for thrift)
hosts
= h1 # default client
accepts only 1 host
hosts.h1
= host1.example.org: 41414 #
host and port must both be specified
batch-size
= 100 #
Must be >= 1 ( default : 100 )
connect-timeout
= 20000 #
Must be >= 1000 ( default : 20000 )
request-timeout
= 20000 #
Must be >= 1000 ( default : 20000 )
|
FailOver支持的配置
1
2
3
|
client.type
= default_failover
hosts
= h1 h2 h3 # at least one is required, but 2 or
more makes better sense
max-attempts
= 3 #
Must be >= 0 ( default :
number of hosts
|
Balancing支持的配置
1
2
3
4
5
|
client.type
= default_loadbalance
hosts
= h1 h2 h3 # At least 2 hosts
are required
backoff
= false #
Specifies whether the client should back-off from a failed host
maxBackoff
= 0 #
Max timeout in millis
host-selector
= round_robin # The host selection strategy used
|
*) 异常类定义
EventDeliveryException和FlumeException