Sometimes you may try to aggregate data from different sources and dump it into a common location, say your HDFS. In such a scenario it will be useful to create a directory inside the HDFS corresponding to each host machine. To do thisFLEME-NG
provide a suitable escape sequence, the%{host}
. Unfortunately it was not working with early releases of FLUME-NG. In such case the only solution was to create a custom interceptor that adds a host header key to each event, along with the corresponding hostname as the header value.
But, luckily guys at Clouders did a great job and contributedan Interceptor to provide this feature out of the box. Now we just have to add few lines in our configuration file and we are good to go. For example, suppose we are collecting Apache web server logs from different hosts into a directory calledflume
inside theHDFS
. It would be quite fussy to figure out which log is coming from which host. So we''ll use%{host}
in our agentconfiguration files
for
the
agents
running on each machine. This will create aseparate directory
for each host inside the flumedirectory
and store the logs from that host there itself. A simple configuration file may look like this :
agent1.sources = tail
agent1.channels = MemoryChannel-2
agent1.sinks = HDFS
agent1.sources.tail.type = exec
agent1.sources.tail.command = tail -F /var/log/apache2/access.log.1
agent1.sources.tail.channels = MemoryChannel-2
agent1.sources.tail.interceptors = hostint
agent1.sources.tail.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.tail.interceptors.hostint.preserveExisting = true
agent1.sources.tail.interceptors.hostint.useIP = false
agent1.sinks.HDFS.channel = MemoryChannel-2
agent1.sinks.HDFS.type = hdfs
agent1.sinks.HDFS.hdfs.path = hdfs://localhost:9000/flume/%{host}
agent1.sinks.HDFS.hdfs.file.Type = DataStream
agent1.sinks.HDFS.hdfs.writeFormat = Text
agent1.channels.MemoryChannel-2.type = memory
Pay attention to the bold lines. And if you want to name the directories with theIP address
of each host instead of its hostname then do this :
agent1.sources.tail.interceptors.hostint.useIP = true