目标:通过flume收集安卓用户产生数据,存储到服务器文件
由于目前我这边的局限性所以我用如下的思路来实现它:
首先通过UDP 的socket将用户产生的信息发送到中间的缓存服务器(因为TCP只提供端到端传输,多个客户同时发送数据时就会出现端口占用情况,所以这里我使用UDP)。
然后缓存服务器到达一定数据量将数据通过TCP的socket发送到flume服务器上
首先flume配置如下,当然还可以存到hdfs或hive等地只需改配置文件即可
#ie the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = XVII
a1.sources.r1.port = 12345
# Describe the sink
a1.sinks.k1.type = file_roll
#a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /home/hadoop/Documents
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:
bin/flume-ng agent --conf conf --conf-file conf/android.conf --name a1 -Dflume.root.logger=INFO,console
你简单可以测试一下是否可用
使用如下命令向flume服务器发消息 。
telnet XVII 12345
缓存服务器代码如下。
public class AndroidMemServer {
public static void main(String[] args) throws IOException {
StringBuffer stringBuffer=new StringBuffer();
DatagramSocket serverSocket = new DatagramSocket(2333);
byte[] receiveData = new byte[1024];
//发
Socket clientSocket = new Socket("flume服务器公网IP",12345);
DataOutputStream dos = new DataOutputStream(clientSocket.getOutputStream());
System.out.println("像服务端发送中");
while (true) {
//接收数据收
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
serverSocket.receive(receivePacket);
String sentence = new String(receivePacket.getData());
//发送到flume服务器
dos.writeUTF(sentence.toString().trim()+"\n");
}
}
}
接下来只需客户端端通过udp向缓存服务器发数据即可,例如Android,或者其他客户端。