Table of Content
- 1 Overview
- 2 Pre-requisites
- 3 Use Case
- 4 Hive Streaming with Storm and Kafka Steps
- 5 Loan Application Lineage in Atlas
- 6 Conclusion
- 7 References
With the release of Hive 0.13.1 and HCatalog, a new Streaming API was released to support continuous data ingestion into Hive tables. This API is intended to support streaming clients like Flume or Storm to better store data in Hive, which traditionally had batch oriented storage.
In our use case, we are going to use Kafka with Storm to load streaming data into bucketed Hive table. Multiple Kafka topics produce the data to Storm that ingests the data into transactional Hive table. Data committed in a transaction is immediately available to Hive queries from other Hive clients. Apache Atlas will track the lineage of Hive transactional table, Storm (Bolt, Spout), and Kafka topic, which will help us to understand how data is ingested into the Hive table.
To use streaming, the following are required:
- Enable ACID support for streaming.
- Store Hive table in ORC format.
- Set tblproperties (“transactional”=”true”) on the table during creation.
- Bucket the Hive table. If desired, the table may also support partitioning along with the bucket definition.
- Require sufficient temporary disk space to support compaction operations.
- Provide necessary permissions to users of the client streaming process to write to the table.
As a sample use case, loan application mock data is used. In this scenario, the input files are split into 4 different loan types and row values are delimited by tab (‘\t’). All the input files are published as different Kafka topics. In our Storm topology,
- ZooKeeper and Kafka topics are configured in “SpoutConfig”.
- Storm consumes the data using “KafkaSpout”.
- Hive options such as batch size, transaction per batch, and so on are defined using core-storm API.
- Hive output is prepared using “LoanApplicationBolt”.
- Topology is finally built using “KafkaSpout” with name “LoanApplicationData”, Hive output bolt with name “LoanApplicationBolt”, and load data into Hive transactional table using “HiveBolt” with name “LoanApplicationOutputBolt”.
Loan Application Storm Topology
Hive Streaming with Storm and Kafka Steps
To achieve Hive streaming with Storm and Kafka, the following steps are performed:
- Create Hive transactional table.
- Create Kafka topics.
- Publish data into Kafka topics.
- Prepare and submit Storm topology.
Let us discuss the above steps in detail.
Creating Hive Transactional Table
Hive table should satisfy the requirements mentioned in the pre-requisites.
CREATE TABLE loan_application_transactional_raw(application_id string, term_months string, first_name string, last_name string,
address string, state string, phone string, type string, origination_date string, loan_decision_type string, decided string,
loan_approved string, denial_reason string, loan_account_status string, approved string, payment_method string, requested_amount string,
funded string, funded_date string, rate string, ecoagroup string, officer_id string, denied_date string)
CLUSTERED BY (loan_decision_type) into 4 buckets
STORED AS ORC
Creating Kafka Topics
In this use case, we have created 4 topics as follows:
./kafka-topics.sh --create --zookeeper ZOOKEEPER_HOST:2181 --replication-factor 1 --partition 1 --topic HomeLoanApplication
./kafka-topics.sh --create --zookeeper ZOOKEEPER_HOST:2181 --replication-factor 1 --partition 1 --topic AutoLoanApplication
./kafka-topics.sh --create --zookeeper ZOOKEEPER_HOST:2181 --replication-factor 1 --partition 1 --topic CreditCardLoanApplication
./kafka-topics.sh --create --zookeeper ZOOKEEPER_HOST:2181 --replication-factor 1 --partition 1 --topic PersonalLoanApplication
Publishing Data into Kafka Topics
We have used inbuilt “kafka-console-producer” script to publish the data into Kafka topics. In this way, each row in the input file will be published as a message into Kafka topic. Before publishing the data into Kafka topic, ensure that all the data files are available in the location. We have uploaded all the input files into “/data/datasource”.
The below commands are used to publish the data into Kafka topics:
Preparing and Submitting Storm Topology
- SpoutConfig registers the ZooKeeper and Kafka topic in line number 41.
- Kafka Spout reads the loan application input data information from a given topic in line number 44.
- Hive Options: As Hive Streaming API divides a stream into a set of batches, configure the parameters for this batch in line number 51.
- Storm topology builder is used to build topology for loan application data from line number 54 to 57.
- Finally, topology is submitted at line number 62.
We are going to execute the topology in the same instance where storm is installed. Maven shade plugin is used to package the artifact in an uber-jar including its dependencies and to shade – i.e. rename – the packages of some of the dependencies. Upload the Uber Jar and topology properties into the location.
To execute the topology, the following commands are used:
storm jar kafka-storm-hive-bolt-0.0.1-SNAPSHOT.jar com.storm.topology.KafkaTopology /data/storm_test/topologyHome.properties HomeLoanApplicationTopology
storm jar kafka-storm-hive-bolt-0.0.1-SNAPSHOT.jar com.storm.topology.KafkaTopology /data/storm_test/topologyAuto.properties AutoLoanApplicationTopology
storm jar kafka-storm-hive-bolt-0.0.1-SNAPSHOT.jar com.storm.topology.KafkaTopology /data/storm_test/topologyCredit.properties CreditCardLoanApplicationTopology
storm jar kafka-storm-hive-bolt-0.0.1-SNAPSHOT.jar com.storm.topology.KafkaTopology /data/storm_test/topologyPersonal.properties PersonalLoanApplicationTopology
The following stats and visualizations are obtained from Auto loan topology and are available for all the executed topologies in Storm UI.
Auto Loan Topology Stats:
Auto Loan Topology Visualization:
Loan Application Lineage in Atlas
The below image shows the loan_application_transactional_raw table lineage after executing Home and Auto loan topology:
After executing all the 4 topologies, the final Atlas lineage would look similar to the below diagram:
- Green table icon indicates Kafka topics.
- Blue gear icon indicates Storm topology.
- Red table icon indicates the Hive table.
After executing Home Loan Topology, the number of records in Hive table:
After executing Auto Loan Topology, the number of records in Hive table:
After executing Credit Card Loan Topology, the number of records in Hive table:
After executing Personal Loan Topology, the number of records in Hive table:
How do transactions work internally in Hive?
Hive stores data in base files that cannot be updated by HDFS. Instead, Hive creates a set of delta files for each transaction that alters a table or partition and stores them in a separate delta directory. Occasionally, Hive compacts or merges the base and delta files. Hive performs all compaction in the background without affecting concurrent reads and writes of Hive clients.
- Hive Streaming with Storm and Kafka code examples are available in GitHub.
GitHub Location: https://github.com/treselle-systems/hive_streaming_with_storm_kafka
- ACID and Transaction in Hive: https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions
- Streaming Data Ingest: https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest