Flume with Kafka Integration

Flume with Kafka Integration

Introduction

This is the final part in multi part series that talks about integration of Flume with Kafka. The use of Apache Flume is not only restricted to log data aggregation, but also involves data sources that are customizable. Flume can transport massive quantities of event data which includes social-media-generated data, email messages and pretty much any data source possible.

Please check our previous blog in this series Flume – Data Collection Framework and Flume with Cassandra Integration, to get a better understanding of this blog,

Data flow diagram:

Data flow diagram

Use Case

Flume listens to twitter tweets and once new tweets come in, it sends to Kafka producer.

What we need to do:

  • Ensure Kafka is set up
  • Create custom Twitter Source to get tweets from Twitter account
  • Create custom Kafka Sink to store tweets on cluster
  • Configure Flume agent
  • Start an agent
  • Scrutinize the output

Solution

Ensure Kafka is set up:

There are many blogs and articles explaining about installation of Kafka. The links are referenced below. For this use case, we have used Red Hat and Kafka 0.8

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.8+Quick+Start

http://kafka.apache.org/documentation.html#introduction

http://kafka.apache.org/downloads.html

Create custom Twitter Source to get tweets from Twitter account:

  • As there are no predefined Twitter sources in Flume, create custom Twitter source.
  • Use Twitter 4j library to integrate our java application with Twitter Service.
  • In order to connect Twitter APIs, we need access to some application-specific secrets. In Twitter source, there are variables like consumer key, and consumer secret, which are used to setup Twitter stream. We can generate these secret keys from the link given below.
  • https://dev.twitter.com/apps
     

Create custom Kafka Sink to store tweets on cluster:

  • As there is no predefined custom Kafka Sink in Flume, create custom Kafka Sink.
  • Kafka sink removes messages from channel and sends to Kafka cluster.
  • Kafka maintains feeds of messages in categories called topics.

Configure Flume agent:

Configuration file includes properties of each source, sink and channel in an agent and their wiring structure to form data flow.

 Start an agent:

To start an agent run following command

Now TwitterKafkaAgent runs Twitter source and Kafka sink.

Note: Before running flume agent please look for the conditions given below:

  • Make sure Kafka is in running state.
  • Create jar files for Twitter Source and Kafka Sink. Move these into lib folder of Flume installation directory.
  • The required dependencies that should be placed in lib folder of flume installation directory are given below:
    • twitter4j-core
    • twitter4j-stream
    • kafka_2.10-0.8.0
    • scala-library
    • metrics-core

Scrutinize the output:

Kafka Sink (Kafka Producer) sends messages to Kafka cluster which in turn serves them up to consumer. Consumer reads the messages from Kafka cluster.

To examine tweets information, start Kafka consumer using the following command from Kafka installation directory.

output

Challenges

  • On January 14th, 2014, connections to api.twitter.com will be restricted to TLS/SSL connections only. If our application still uses HTTP plaintext connections we need to update it to use HTTPS connections, otherwise our app will stop functioning. This SSL requirement will be enforced on all api.twitter.com URLs, including all steps of OAuth and all REST API resources. So that we should use SSL to connect at api.twitter.com. Any well-established HTTP client library already supports the ability to connect to a SSL-enabled server and usually the required change is just a matter of updating a few lines of code or configuration files.

We can use SSL to connect at api.twitter.com, in two ways:

  • Set -Dhttp.useSSL=true
  • twitter4j.conf.ConfigurationBuilder.setUseSSL(true)

If you don’t use SSL then following exception will be thrown

sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target.

  • To avoid following exception set the host entry in the local for Kafka producer to identify cluster.

Exception in thread “main” kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
    at kafka.javaapi.producer.Producer.send(Unknown Source)
    at com.demo.test.KafkaProducer.main(KafkaProducer.java:34)

Conclusion

  • In this blog we have seen how effectively flume moves real-time large amount of data (Twitter tweets) and stores it in a high-throughput distributed messaging system like Kafka.
  • This will help us to develop custom Twitter source to process events from Twitter and custom Kafka sink to store events from channel.

References

https://issues.apache.org/jira/browse/FLUME#selectedTab=com.atlassian.jira.plugin.system.project%3Apopularissues-panel

10066 Views 5 Views Today