Table of Content
- 1 Overview
- 2 Pre-requisites
- 3 Data Description
- 4 Use Case
- 5 Self-Service Data Ingest, Data Cleansing, and Data Validation
- 6 Conclusion
- 7 References
Kylo, a feature-rich data lake platform, is built on Apache Hadoop and Apache Spark. Kylo provides a business-friendly data lake solution and enables self-service data ingestion, data wrangling, data profiling, data validation, data cleansing/standardization, and data discovery. Its intuitive user interface allows IT professionals to access the data lake (without having to code).
Though there are many tools to ingest batch data and/or streaming or real-time data, Kylo supports both data. It provides a plug-in architecture with a variety of extensions. Apache NiFi templates provide incredible flexibility for batch and streaming use cases. In this blog post, let us discuss ingesting data from Apache Kafka, performing data cleansing and validation at real-time, and persisting the data into Apache Hive table.
- Install Kafka.
- Deploy Kylo, where the deployment requires knowledge on different components/technologies such as:
- AngularJS for Kylo UI
- Apache Spark for data wrangling, data profiling, data validation, data cleansing, and schema detection
- JBoss ModeShape and MySQL for Kylo Metadata Server
- Apache NiFi for pipeline orchestration
- Apache ActiveMQ for interprocess communication
- Elasticsearch for search-based data discovery
- All Hadoop technologies but most preferably HDFS, YARN, and Hive
To know more about basics and installation of Kylo in AWS EC2 instance, refer our previous blog on Kylo Setup for Data Lake Management.
User transaction dataset with 68K rows, generated by Treselle team, is used as the source file. The input dataset has time, uuid, user, business, address, amount, and disputed columns.
Examples of invalid and missing values in the dataset:
- Publish user transaction dataset into Kafka.
- Ingest data from Kafka using Kylo data ingestion template and standardize & validate data.
- Customize data ingest pipeline template
- Define categories for feeds
- Define feeds with source and destination
- Cleanse and validate data
- Schedule feeds
- Monitor feeds
Self-Service Data Ingest, Data Cleansing, and Data Validation
Kylo utilizes Spark to provide a pre-defined pipeline template, which implements multiple best practices around data ingestion. By default, it comes up with file system and databases. It helps business users in simplifying configuration of ingest data from new sources such as JMS, Kafka, HDFS, HBase, FTP, SFTP, REST, HTTP, TCP, IMAP, AMQP, POP3, MQTT, WebSocket, Flume, Elasticsearch and Solr, Microsoft Azure Event Hub, Microsoft Exchange using Exchange Web Services (EWS), Couchbase, MongoDB, Amazon S3, SQS, DynamoDB, and Splunk.
Apache NiFi, a scheduler and orchestration engine, provides an integrated framework for designing new types of pipelines with 250+ processors (data connectors and transforms). The pre-defined data ingest template is modified by adding Kafka, S3, HDFS, and FTP as shown in the below screenshot:
Get, Consume, and Fetch named processors are used to ingest the data. The Get and Consume versions of Kafka processors in NiFi is as follows:
GetKafka 1.3.0: Fetches messages from the earlier version of Apache Kafka (specifically 0.8.x versions). The complementary NiFi processor used to send messages is PutKafka.
ConsumeKafka_0_10 1.3.0: Consumes messages from the newer version of Apache Kafka specifically built against the Kafka 0.10.x Consumer API.
Based on need, a custom processor or other custom extension for NiFi can be written & packaged as a NAR file and deployed into NiFi.
Customizing Data Ingest Pipeline Template
On updating and saving the data ingest template in NiFi, the same template can be customized in Kylo UI. The customization steps involve:
- Customizing feed destination table
- Adding input properties
- Adding additional properties
- Performing access control
- Registering the template
Defining Categories for Feeds
All the feeds created in Kylo should be categorized. The process group in NiFi is launched to execute the feeds. “Transaction raw data” category is created to categorize the feeds.
Defining Feeds with Source and Destination
Kylo UI is self-explanatory to create and schedule the feeds. To define feeds, perform the following:
- Choose data ingest template.
- Provide feed name, category, and description.
- Choose input Data Source to ingest data.
- Customize the configuration parameter related to that source.
For example, “transactionRawTopic” in Kafka and batch size “10000”.
- Define output feed table using either of the following methods:
- Manually define the table columns and its data type.
- Upload sample file and update the data type as per the data in the column.
- Preview the data under Feed Details section in the top right corner.
- Define partitioning output table by choosing Source Field and Partition Formula.
For example, “time” as source field and “year” as partition formula to partition the data.
Cleansing and Validating Data
Feed creation wizard UI allows end-users to configure cleansing and standardization functions to manipulate data into conventional or canonical formats (for example, simple data type conversion such as dates, stripping special characters) or data protection (for example, masking credit cards, PII, and so on).
It allows users to define field-level validation to protect data against quality issues and provides schema validation automatically. It provides an extensible Java API to develop custom validation, custom cleansing, and standardization routines as per needs. It provides predefined rules for standardization and validation of different data types.
To clean and validate data, perform the following:
- Apply different pre-defined standardization rules for time, user, address, and amount columns as shown below:
- Apply standardization and validation for different columns as shown in the below screenshot:
- Define data ingestion merge strategy in the output table.
- Choose “Dedupe and merge” to ignore duplicated batch data and insert it into the desired output table.
- Use Target Format section to define data storage and compression options.
Supported Storage Formats: ORC, Parquet, Avro, TextFile, and RCFile
Compression Options: Snappy and Zlib
To schedule the feeds using cron or timer based mechanism, enable “Enable Feed immediately” option to enable the feeds immediately without waiting for cron job or timer criteria meets.
After scheduling the feeds, the actual execution will be performed in NiFi. Feeds status can be edited and monitored. The feed details can be changed at any time and the feeds can be re-scheduled.
Overview of created feed job status can be seen under jobs in Operation sections. By drilling down the jobs, identify the details of each job and perform debug on feed job execution failure.
Job Activity section provides details such as completed, running, and so on of a specific feed recurring activity.
Operational Job Statistics section provides details such as success rate, flow rate per second, flow duration, and steps duration of specific job statistics.
In this blog, we discussed data ingestion, cleansing, and validation without any coding in Kylo data lake platform. The ingested data output from Kafka is shown in Hive table in Ambari as follows:
In our next blog – Kylo – Automatic Data Profiling and Search-based Data Discovery, let us discuss data profiling and search-based data discovery.
- Kylo Setup for Data Lake Management:
- Kylo Features:
- Kylo FAQ:
- NiFi Custom Processor:
- Custom Standardizer and Validator: