Hive Streaming with Kafka and Storm with Atlas

Hive Streaming with Kafka and Storm with Atlas

Overview

With the release of Hive 0.13.1 and HCatalog, a new Streaming API was released to support continuous data ingestion into Hive tables. This API is intended to support streaming clients like Flume or Storm to better store data in Hive, which traditionally had batch oriented storage.

In our use case, we are going to use Kafka with Storm to load streaming data into bucketed Hive table. Multiple Kafka topics produce the data to Storm that ingests the data into transactional Hive table. Data committed in a transaction is immediately available to Hive queries from other Hive clients. Apache Atlas will track the lineage of Hive transactional table, Storm (Bolt, Spout), and Kafka topic, which will help us to understand how data is ingested into the Hive table.

Pre-requisites 

To use streaming, the following are required:

  • Enable ACID support for streaming.
  • Store Hive table in ORC format.
  • Set tblproperties (“transactional”=”true”) on the table during creation.
  • Bucket the Hive table. If desired, the table may also support partitioning along with the bucket definition.
  • Require sufficient temporary disk space to support compaction operations.
  • Provide necessary permissions to users of the client streaming process to write to the table.

Use Case

As a sample use case, loan application mock data is used. In this scenario, the input files are split into 4 different loan types and row values are delimited by tab (‘\t’). All the input files are published as different Kafka topics. In our Storm topology,

  • ZooKeeper and Kafka topics are configured in “SpoutConfig”.
  • Storm consumes the data using “KafkaSpout”.
  • Hive options such as batch size, transaction per batch, and so on are defined using core-storm API.
  • Hive output is prepared using “LoanApplicationBolt”.
  • Topology is finally built using “KafkaSpout” with name “LoanApplicationData”, Hive output bolt with name “LoanApplicationBolt”, and load data into Hive transactional table using “HiveBolt” with name “LoanApplicationOutputBolt”.

Loan Application Storm Topology

select

Hive Streaming with Storm and Kafka Steps

To achieve Hive streaming with Storm and Kafka, the following steps are performed:

  • Create Hive transactional table.
  • Create Kafka topics.
  • Publish data into Kafka topics.
  • Prepare and submit Storm topology.

Let us discuss the above steps in detail.

Creating Hive Transactional Table

Hive table should satisfy the requirements mentioned in the pre-requisites.

Creating Kafka Topics

In this use case, we have created 4 topics as follows:

Publishing Data into Kafka Topics

We have used inbuilt “kafka-console-producer” script to publish the data into Kafka topics. In this way, each row in the input file will be published as a message into Kafka topic. Before publishing the data into Kafka topic, ensure that all the data files are available in the location. We have uploaded all the input files into “/data/datasource”.

select

The below commands are used to publish the data into Kafka topics:

select

Preparing and Submitting Storm Topology

  • SpoutConfig registers the ZooKeeper and Kafka topic in line number 41.
  • Kafka Spout reads the loan application input data information from a given topic in line number 44.
  • Hive Options: As Hive Streaming API divides a stream into a set of batches, configure the parameters for this batch in line number 51.
  • Storm topology builder is used to build topology for loan application data from line number 54 to 57.
  • Finally, topology is submitted at line number 62.

select

We are going to execute the topology in the same instance where storm is installed. Maven shade plugin is used to package the artifact in an uber-jar including its dependencies and to shade – i.e. rename – the packages of some of the dependencies. Upload the Uber Jar and topology properties into the location.

select

To execute the topology, the following commands are used:

The following stats and visualizations are obtained from Auto loan topology and are available for all the executed topologies in Storm UI.

Auto Loan Topology Stats:

select

Auto Loan Topology Visualization:

select

Loan Application Lineage in Atlas

The below image shows the loan_application_transactional_raw table lineage after executing Home and Auto loan topology:

select

After executing all the 4 topologies, the final Atlas lineage would look similar to the below diagram:

select

Notes:

  • Green table icon indicates Kafka topics.
  • Blue gear icon indicates Storm topology.
  • Red table icon indicates the Hive table.

Conclusion

After executing Home Loan Topology, the number of records in Hive table:

select

After executing Auto Loan Topology, the number of records in Hive table:

select

After executing Credit Card Loan Topology, the number of records in Hive table:

select

After executing Personal Loan Topology, the number of records in Hive table:

select

How do transactions work internally in Hive?

Hive stores data in base files that cannot be updated by HDFS. Instead, Hive creates a set of delta files for each transaction that alters a table or partition and stores them in a separate delta directory. Occasionally, Hive compacts or merges the base and delta files. Hive performs all compaction in the background without affecting concurrent reads and writes of Hive clients.

References

1695 Views 11 Views Today