Streaming Analytics using Kafka SQL

Streaming Analytics using Kafka SQL

Overview

Kafka SQL, a streaming SQL engine for Apache Kafka by Confluent, is used for real-time data integration, data monitoring, and data anomaly detection. KSQL is used to read, write, and process Citi Bike trip data in real-time, enrich the trip data with other station details, and find the number of trips started and ended in a day for a particular station. It is also used to publish the trip data from source to other destinations for further analysis.

In this blog, let us discuss enriching the Citi Bike trip data and finding the number of trips on the particular day to/from the particular station.

Pre-requisites

Install the following:

  • Scala
  • Apache Kafka
  • KSQL
  • JDK

Data Description

Trip dataset of Citi Bike March 2017 is used as the source data. It contains basic details such as trip duration, ride start time, ride end time, station ID, station name, station latitude, and station longitude.

select

Station dataset of Citi Bike is used for enriching trip details for further analysis after data consumption. It contains basic details such as availableBikes, availableDocks, statusValue, and totalDocks.

select

Use Case

  • Enrich Citi Bike trip data in real time using join and aggregation concepts.
  • Find the number of trips on the day to/from the particular station.
  • View trip details with station details & aggregate trip count of each station.

Synopsis

  • Produce station details
  • Join stream data and table data
  • Group data
  • Produce trip details
  • View output
    • View trip details with station details
    • View aggregate trip count of each station

Producing Station Details

To produce the station details using Scala, perform the following:

  • Create trip-details and station-details topics in Kafka using the below commands:
select

select

    • Iterate the station list to produce JSON file using the below commands:

select

  • Produce the station data into the station-details topic via the below Scala command:
select

  • Iterate and produce the station details list in JSON format.
  • Check the produced and consumed station details using the below command:

Joining Stream Data and Table Data

To join the stream and table data, perform the following:

  • In KSQL console, create a table for the station details to join it with the trip details while producing the stream using the below commands:
select

  • In KSQL Console, create a stream for the trip details to enrich the data with the start station details and to find the trip count of each station for the day using the below commands:
select

  • Join the stream with the station details table to get fields such as availableBikes, totalDocks, and availableDocks using the station ID as the key.
  • Extract the select statement start time in the date format as the timestamp to get only the day from the start time to find the started trip count details in the day using the below commands:
select

  • Add the end station details with the trip details in another topic similar to the start station.
  • Extract end time field as a long timestamp using the below commands:
select

  • Join the streamed trip details with the station details table as KSQL does not allow joining of two streams or two tables.

Grouping Data

To group data based on the station details and the date, perform the following:

  • Format date as YYYY-MM-DD from the long timestamp to group by date in the start trip details using the below commands:
select

  • Format date as YYYY-MM-DD from the long timestamp to group by date in the end trip details using the below commands:
select

  • Create a table by grouping the data based on the date and the stations for finding the started trip counts and the ended trip counts of each station for the day using the below commands:
select
select

  • List the topics to check whether the topics are created for persistent queries or not.

select

Producing Trip Details

To produce the trip details into the topic trip-details using Scala, use the below commands:

select

From the above console output, it is evident that a total of 727664 messages are produced for data enrichment at the stream.

Viewing Output

Viewing Trip Details with Station Details

To view the trip details with the station details, perform the following:

  • Consume the message using the topic CITIBIKE_TRIP_START_STATION_DETAILS to view the extra fields added to trip details from the station details table and to extract the long timestamp field from the start and end times using the below commands:
select

  • Consume the message using the topic CITIBIKE_TRIP_END_STATION_DETAILS using the below commands:
select

From the above console output, it is evident that the fields of the station details are added to the trip while producing the trip details.

Viewing Aggregate Trip Count of Each Station

To view the aggregate trip count of each station based on the date, perform the following:

  • Consume the message via the console to check the trip counts obtained on the stream using the below commands:
select

From the above console output, it is evident that the trip counts are updated and added to the topic for each day when producing the message. So, this data can be filtered to the latest trip count in consumer for further analysis.

  • Obtain the end trip count details based on the stations using the below commands:
select

Conclusion

In this blog, we discussed adding extra fields from the station details table, extracting date in the YYYY-MM-DD format, and grouping the details based on the station ID & the day for getting the start and end trip count details of the station.

References

1422 Views 1 Views Today