Data Flow Pipeline using StreamSets

Data Flow Pipeline using StreamSets

Overview

StreamSets Data Collector, an open-source, lightweight, powerful engine, is used to stream data in real time. It is a continuous big data ingest and enterprise-grade infrastructure used to route and process data in your data streams. It accelerates Time to Analysis by bringing unique transparency and processing to data in motion.

In this blog, let us discuss about generating a data flow pipeline using StreamSets.

Pre-requisites

  • Install Java 1.8
  • Install streamsets-datacollector-2.5.1.1

Use Case

Generating a data flow pipeline using StreamSets via JDBC connections.

What we need to do:

  1. Install StreamSets Data Collector
  2. Create JDBC Origin
  3. Create JDBC Lookup
  4. Create Dataflow Pipeline
  5. View Pipeline and Stage Statistics

Installing StreamSets Data Collector

As core software is developed in Java, web interface is developed in JavaScript/Angular JS, D3.js, HTML, and CSS.

To install StreamSets, perform the following:

Installing Java

To install Java, use the below command:

Use command: whereis java (to check java location)

Ensure that JAVA_HOME variable is set to:

To set JAVA_HOME, use the below command:

Installing StreamSets (from Tarball)

To install StreamSets, perform the following:

  • Create a directory as follows:
  • Extract the tar file using the below command:
  • Create a system user and group named sdc using the below commands:
  • Create the /etc/init.d directory (in root) using the below command:
  • Copy /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype to /etc/init.d directory and change ownership of the file to sdc using the below commands:
  • Edit /etc/init.d/sdc file and set $SDC_DIST and $SDC_HOME environment variables to the location from where tarball is extracted using the below commands:
  • Make the sdc file executable using the below command:
  • Create the Data Collector configuration directory at /etc/sdc: (in root) using the below command:
  • Copy all files from etc into the Data Collector configuration directory that you just created and extracted the tarball using the below command:
  • Change the ownership of the /etc/sdc directory and all files in the directory to sdc:sdc using the below command:
  • Provide ownership only permission for form-realm.properties file in the /etc/sdc directory using the below command:
  • Create the Data Collector log directory at /var/log/sdc and change the ownership to sdc:sdc using the below commands:
  • Create Data Collector data directory at the path – /var/lib/sdc and change the ownership to sdc:sdc using the below commands (in root):
  • Create Data Collector resources directory at /var/lib/sdc-resources and change the ownership to sdc:sdc using the below commands:
  • Start Data Collector as a service using the below command:
Note: On getting error called “sdc is died”, check the configured limit for the current user using the below command:
Set the session limit using the below command:
  • Access the Data Collector console by entering the following URL in the address bar of browser:
Note: the default username is “admin” and password is “admin”.

Creating JDBC Origin

To create JDBC origin, perform the following steps:

  • Click Create New Pipeline to create a pipeline.
  • Add Title for the pipeline as shown in the below diagram:

1

Note: In this analysis, Origin “JDBC Query consumer” is used.

  • Download JDBC origin Package Manager as shown in the below diagram:

2

Note: You can also import the package manually using the below command:

  • Add configurations to JDBC Query Consumer origin.
  • Uncheck “Incremental mode” in configuration to avoid default Query consumer search for “where” and “order by” clause to execute the query as shown in the below diagram:

3

  • Add “where” and “order by” clause using offset value.

4

  • Click Validate to check the connection.

Note: If you are unable to connect to JDBC Query Consumer, move “mysql-connector-java-5.1.27-bin.jar” to the below path:

Creating JDBC Lookup

To create JDBC lookup, lookup columns are required from source and lookup table.
For example, use the ‘applicantId’ field in “applicant” (source) table to look up the ‘applicantId’ column in “application” (lookup) table using the below query:

The query uses the value of “applicantId” column from the applicant (source) table. In this example, three tables are used for lookup.

5

The result of the above JDBC lookup is given as an input to next lookup table “loan_raw” by using the below query:

Creating Dataflow Pipeline

Different “Processors” are used for creating dataflow pipeline.

Field Remover

It discards unnecessary fields in the pipeline.

8

Expression Evaluator

It performs calculations and writes the results to new or existing fields. It is also used to add or modify record header attributes and field attributes.

9

Stream Selector

It passes data to streams based on conditions and uses a default stream to pass records unmatched with user-defined conditions. You can also define a condition for each stream of data.

7

Local FS is used to store the resultant data.

10

 

11

The full data flow pipeline is as follows:

12

Viewing Pipeline and Stage Statistics

A pipeline can be monitored while running it. Real-time summary and error statistics can be viewed for the pipeline and for the stages in the pipeline. By default, the Data Collector console displays pipeline monitoring information while running the pipeline. Any stage can be selected to view its statistics. Similarly, error information for the pipeline and its stages can be viewed.

Previewing Dataflow Pipeline

In Data collector pipeline, on clicking preview, input and output data can be seen in each level.

13

Viewing Pipeline States

Pipeline state is defined as the current condition such as “running” or “stopped” of the pipeline. The pipeline state is displayed in All Pipelines list and Data Collector log.

viewing_pipeline_states

Viewing Pipeline Statistics

Record count, record and batch throughput, batch processing statistics, and heap memory usage are displayed for the pipeline as shown in the below diagram:

14

Values of the parameters currently used by the pipeline is displayed for a pipeline started with runtime parameters as shown in the below diagram:

15

Viewing Stage Statistics

Record and batch throughput, batch processing statistics, and heap memory usage are displayed for a stage as shown in the below diagram:

16

 

17

Conclusion

In this blog, we discussed about configuring JDBC Query Consumer, performing JDBC lookup with more than one tables, creating dataflow pipeline, and monitoring the stage statistics and pipeline statistics.

References

1205 Views 1 Views Today