Flume with Cassandra Integration

Flume with Cassandra Integration

Introduction

This is second part in a multi part series that discuss about integration of Flume with Cassandra. In the prior part, we have discussed about aggregation and storage of log files in HDFS sink. Now we’ll see how HDFS sink can be replaced with Cassandra sink, which helps to store data in a distributed, reliable and available way.

Please check our first part in the series, Flume – Data Collection Framework to get a better understanding of the blog.

Use Case

Aggregate, multiple log files and store it in custom Cassandra sink.

What we need to do:

  • Ensure Cassandra is set up
  • Design a data model to store log information
  • Create custom Cassandra sink
  • Configure Flume agent
  • Start an agent
  • Scrutinize the output

Solution

Ensure Cassandra is set up

  • Refer to our blog on Cassandra Data Model – Part1

http://www.treselle.com/blog/cassandra-data-model-for-twitter-part-1/#Install_Cassandra

Pre-requisites:

Design a data model to store log information:

data model to store log info

Create custom Cassandra sink:

  • The purpose of a sink is to extract events from channel and forward them to the next Flume Agent in flow or store them in an external repository.
  • The sink is associated with exactly one Channel, as configured in Flume properties (configuration) file.
  • To create custom sink, override configure (Context) method from Configurable Interface, and process() method from Abstract Sink Class.
  • Since configure() method loads first before the start() method, it is used to set configuration details of Cassandra like node(hosts), key space name, and ports.
  • start() method is used to establish the connection of Cassandra.
  • process() method is used to get each event from Channel class, with the help of take() method and store it in Cassandra.
  • stop() method is used to close connection.

A simple Cassandra sink requires property file with the parameters as follows:

config.properties
hosts: 127.0.0.1
keyspace: keyspace1

Configure Flume Agent:

This configuration aggregates 2 log files and stores in Cassandra database.

 Start an agent:

 Note:

  • Before starting an agent, make sure that the log files are present in the specified location.
  • In this use case we aggregate input1.log and input2.log files from var/log directory.
  • The required dependencies that should be placed in lib folder of flume installation directory are given below:
    • apache-cassandra
    • cassandra-driver-core

Scrutinize the output:

Follow the steps given below to examine the log information stored in Cassandra database:

  • Run the following command from Cassandra installation directory to connect local Cassandra instance:
  •  Authenticate the keyspace to check stored log information:
  • Run the following command to fetch the log information we inserted:

Challenges

  • While storing log information for a particular hour, the data gets overwritten because of row key repetition as shown below,

row key repeatation

To overcome this problem, alter column family by adding one more column as partition key which has timestamp as data type and get timestamp value in milliseconds from dateOf(now()) function. Make sure that row key pair (time_hour, log_time, internal_time) is not repeated, thus helping it to insert all the data, for an hour without overwriting existing data.

row key repeatation 2

Conclusion

  • In Flume terminology Cassandra is the best storage destination to store large amount of log data.
  • This blog helped us to create custom sink in Flume.
  • The next blog will help us to create custom source in Flume.

References

 

5927 Views 2 Views Today
  • sumit

    This is very nice post. Can you give me sample input.log file