
Introduction
This is first part in a multi part series that talks about Apache Flume – a distributed, reliable, and available system that helps in collecting, aggregating and moving large amounts of log data efficiently from many different sources to a centralized data store.
Flume versions:
- Flume OG (Old Generation or 0.X)
- Flume NG (New Generation or 1.X)
Flume OG is the first available Flume distribution, which was replaced by a complete refactoring, called Flume NG.
Flume runs as an agent. The agent is sub-divided into following categories:
- Event: Payload of data or part of data that is transported by flume
- Source: Writes events to one or more channels.
- Channel: The holding area, as events are passed from a source to a sink.
- Sink: Receives events from a single channel
Flume channels the data between sources and sinks which are either predefined or customized.
Using Flume we can aggregate multiple log files across the network and store it centrally. The aggregation and storing of data can be achieved by chaining agents together. The sink of one agent sends data to the source of another. The standard method of sending data across the network with Flume is, by using Avro which moves data over the network.
Flow of data across an agent as follows:The focus of this series is to understand how Flume can be used for log data aggregation, and in transporting massive quantities of social media-generated data or any such data source. This series is broken down into the following multi-part blogs:
- Flume – Data Collection Framework (this blog)
- Flume with Cassandra
- Flume with Kafka
Use Cases
We have two use cases to help us understand the process involved in Flume. Let’s start from the basics and develop it over in the next use case.
First use case
Let’s take the first use case as “Hello World” to describe a single-node Flume deployment. In this use case, Flume generates events and subsequently logs them to the console.
What we need to do:
- Configure the Flume agent
- Start an agent
- Send events into Flume agent’s source
Second use case
In second use case, we will see how to collect 3 log files and store in HDFS using Flume. The same procedure is followed to collect multiple log files.
What we need to do:
- Ensure Hadoop is set up
- Configure the Flume agent
- Start an agent
Solutions
Solution for first use case ”Helloworld”
Before solving our use case, let’s get some pre-requisites satisfied.
Pre-requisites:
- JDK 1.6 +
- This blog series uses Flume 1.4.0
Flume Downloads Page: https://flume.apache.org/download.html
- Verify:
- Download the apache-flume-1.4.0-bin.tar.gz from the above link and extract into a directory
- Run help command as follows in the bin folder of Flume installed directory.
1234567891011121314151617181920212223# ./bin/flume-ng helpUsage: ./bin/flume-ng<command> [options]...commands:help display this help textagent run a Flume agentavro-client run an avro Flume clientversion show Flume version infoglobal options:--conf,-c <conf> use configs in <conf> directory--classpath,-C <cp> append to the classpath--dryrun,-d do not actually start Flume, just print the command--plugins-path <dirs> colon-separated list of plugins.d directories. See theplugins.d section in the user guide for more details.Default: $FLUME_HOME/plugins.d-Dproperty=value sets a Java system property value-Xproperty=value sets a Java -X optionagent options:--conf-file,-f <file> specify a config file (required)--name,-n <name> the name of this agent (required)--help,-h display help text
Configure the Flume Agent
- The configuration file includes Flume agent properties like source, sink, and channel. This agent hosts the flow of data.
- This configuration defines a single agent named hello_agent.
- hello_agent consists of a source that listens for data on port 12345, a channel that buffers event data in memory, and a sink that logs event data to the console.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# A single-node Flume configuration # Name the components on this agent hello_agent.sources = r1 hello_agent.sinks = k1 hello_agent.channels = c1 # Describe/configure the source hello_agent.sources.r1.type = netcat hello_agent.sources.r1.bind = localhost hello_agent.sources.r1.port = 12345 # Describe the sink hello_agent.sinks.k1.type = logger # Use a channel which buffers events in memory hello_agent.channels.c1.type = memory # Bind the source and sink to the channel hello_agent.sources.r1.channels = c1 hello_agent.sinks.k1.channel = c1 |
Start an agent
- Start an agent using a shell script called Flume-NG, which is located in the bin directory of the Flume distribution. Specify the agent name, the config directory, and the config file on the command line.
To start an agent, run the following command
1 |
# bin/flume-ng agent -n hello_agent -c conf -f conf/helloworld.conf -Dflume.root.logger=INFO,console |
The -Dflume.root.logger property overrides the root logger in conf/log4j.properties file to use the console appender. If we don’t override the root logger, everything would still work, but the output would go to a file log/flume.log. Of course, we can edit the conf/log4j.properties file and change the flume.root.logger property.
- Once the entire configuration is parsed, we can see this message with the configured data.
12014-02-18 17:11:00,246 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:12345]
Send events into Flume agent’s source
- Open a second terminal and run the following command to send Hello World string as an event
123456# telnetlocalhost 12345Trying 127.0.0.1...Connected to localhost.localdomain (127.0.0.1).Escape character is '^]'.Hello World!OK
12014-02-18 17:25:11,115 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 0D Hello World!. } - If we send another line as follows:
12Hi How are you? What are you doing?OK
12014-02-18 17:28:21,873 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 48 69 20 48 6F 77 20 61 72 65 20 79 6F 75 3F 20 Hi How are you? }
Solution for log data aggregation
Using HDFS sink requires the installation of Hadoop which helps Flume to use Hadoop jars to communicate with the HDFS cluster.
Ensure Hadoop is set up
There are so many blogs and articles explaining about how to install Hadoop, and the links are referenced below. For this use case, we use Ubuntu and Hadoop 2.2.0
http://tecadmin.net/steps-to-install-hadoop-on-centosrhel-6/#
http://codesfusion.blogspot.in/2013/10/setup-hadoop-2x-220-on-ubuntu.html
Configure the Flume agent
- This configuration aggregates 3 log files and stores in HDFS sink.
- In this configuration we have used Avro source and Avro sink to send the data across the network.
- To use the Avro source, we have to specify the type property with a value as Avro. We should also provide a bind address and port number to listen. The same is done for Avro sink also.
- HDFS sink writes events into the Hadoop Distributed File System (HDFS).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
agent.sources = source1 source2 source3 source4 agent.channels = channel1 channel2 channel3 channel4 agent.sinks = sink1 sink2 sink3 sink4 agent.sources.source1.type = exec agent.sources.source1.command = tail -F /var/log/input1.log agent.sources.source1.channels = channel1 agent.channels.channel1.type = memory agent.sinks.sink1.type = avro agent.sinks.sink1.hostname = test.example.com agent.sinks.sink1.port = 54345 agent.sinks.sink1.channel=channel1 agent.sources.source2.type = exec agent.sources.source2.command = tail -F /var/log/input2.log agent.sources.source2.channels = channel2 agent.channels.channel2.type = memory agent.sinks.sink2.type = avro agent.sinks.sink2.hostname = test.example.com agent.sinks.sink2.port = 54345 agent.sinks.sink2.channel = channel2 agent.sources.source3.type = exec agent.sources.source3.command = tail -F /var/log/input3.log agent.sources.source3.channels = channel3 agent.channels.channel3.type = memory agent.sinks.sink3.type = avro agent.sinks.sink3.hostname = test.example.com agent.sinks.sink3.port = 54345 agent.sinks.sink3.channel = channel3 agent.sources.source4.type = avro agent.sources.source4.bind = test.example.com agent.sources.source4.port = 54345 agent.sources.source4.channels = channel4 agent.channels.channel4.type = memory # The maximum number of events stored in the channel agent.channels.channel4.capacity = 1000 # The maximum number of events the channel will take from a source per transaction agent.channels.channel4.transactionCapacity = 100 agent.sinks.sink4.type = hdfs # HDFS directory path agent.sinks.sink4.hdfs.path = /tmp/flume # Name prefixed to files created by Flume in hdfs directory agent.sinks.sink4.hdfs.filePrefix = LogData # Number of seconds to wait before rolling current file agent.sinks.sink4.hdfs.rollInterval = 60 # Format for sequence file records agent.sinks.sink4.hdfs.writeFormat = Text # File format agent.sinks.sink4.hdfs.fileType = DataStream agent.sinks.sink4.channel = channel4 |
Start an agent
Before starting an agent, make sure that the log files are present in the specified location.
In this use case we aggregate input1.log, input2.log and input3.log files from var/log directory.
To start an agent run following command:
1 |
# bin/flume-ng agent –n agent -c conf –f conf/MultipleLogFiles.conf -Dflume.root.logger=INFO,console |
Once the entire configuration is parsed, we can see the following log message:
Log aggregation in Flume stores many files in HDFS and these files can be split based on properties of HDFS sink.
Use following command to examine the created files in HDFS:
Challenges
- Flume expects proper names of components, and configuration file while running an agent. Otherwise, we will get org.apache.commons.cli.ParseException. For example: If we create helloworld.conf file in conf folder and try to run helloword.conf, then the following exception will be thrown.
1232014-02-18 17:34:27,076 (main) [ERROR - org.apache.flume.node.Application.main(Application.java:307)] A fatal error occurred while running. Exception follows.org.apache.commons.cli.ParseException: The specified configuration file does not exist: /usr/apache-flume-1.4.0-bin/conf/helloword.confatorg.apache.flume.node.Application.main(Application.java:275) - If we miss the -c parameter of the command, which is used to start an agent, we will get the following error:
1[conf-file-poller-0] INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider - Reloading configuration file:conf/helloworld.conf - To avoid following exception use same host name, port for both Avro source and sink in the configuration.
1234567891011121314152014-02-19 20:07:43,916 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.org.apache.flume.EventDeliveryException: Failed to send eventsat org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)at java.lang.Thread.run(Thread.java:662)Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: test.example.com, port: 543 }: RPC connection errorat org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:269)at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:339)
Conclusion
- We have successfully configured and deployed a Flume agent.
- We can aggregate log data from multiple sources (hundreds to thousands) at high velocity and volume using Flume.
- We can aggregate hundreds of logs by configuring a number of first tier agents with an Avro sink, pointing to an Avro source of a single agent.
- The next blog will help us to create custom source and custom sink in Flume.
References
Websites:
- Apache Flume User Guide: https://flume.apache.org/FlumeUserGuide.html
- Apache Flume Developer Guide: http://flume.apache.org/FlumeDeveloperGuide.html
- Avro source and sink: http://www.packtpub.com/article/avro-source-sink
- Hadoop analysis of Apache logs: http://cuddletech.com/blog/?p=795
Book:
- Apache Flume: Distributed Log Collection for Hadoop by Steve Hoffman