Custom Partitioning and Analysis using Kafka SQL Windowing

Custom Partitioning and Analysis using Kafka SQL Windowing

Overview

Apache Kafka uses round-robin fashion to produce messages to multiple partitions. Custom partition technique is used to produce a particular type of message in the defined partition and to make the produced message to be consumed by a particular consumer. This technique allows us to take control over the produced messages. Windowing allows event-time driven analysis and data grouping based on time limits. The three different types of windowing are Tumbling, Session, and Hopping.

In this blog, we will discuss processing Citibike trip data in the following ways:

  • Partitioning trip data based on user type using the Custom partitioned technique.
  • Analyzing trip details at stream using Kafka SQL Windowing.

Pre-requisites

Install the following:

  • Scala
  • Java
  • Kafka
  • Confluent
  • KSQL

Data Description

Trip dataset of Citi Bike March 2017 is used as the source data. It contains basic details such as trip duration, start time, stop time, station name, station ID, station latitude, and station longitude.

Sample Dataset

select

Use Case

  • Process Citibike trip data to two different brokers by partitioning the messages according to user types (Subscriber or Customer).
  • Use Kafka SQL Windowing concepts to analyze the following details:
    • Number of trips started at particular time limits using Tumbling Window.
    • Number of trips started using advanced time intervals using Hopping Window.
    • Number of trips started with session intervals using Session Window.

Synopsis

  • Set up Kafka cluster
  • Produce and consume trip details using custom partitioning
  • Create trip data stream
  • Perform streaming analytics using Window Tumbling
  • Perform streaming analytics using Window Session
  • Perform streaming analytics using Window Hopping

Setting Up Kafka Cluster

To setup the cluster on the same server by changing the ports of the brokers in the cluster, perform the following steps:

  • Run ZooKeeper on default port 2181.
    The ZooKeeper data will be stored by default in /tmp/data.
  • Change the default path (/tmp/data) to another path with enough space for non-disrupted producing and consuming.
  • Edit the ZooKeeper configurations in zookeeper.properties file available in the confluent base path etc/kafka/zookeeper.properties as shown in the below diagram:

select

  • Start the ZooKeeper using the following command:
You can view the below ZooKeeper startup screen:

select

  • Start 1st broker in the cluster by running default Kafka broker in port 9092 and setting broker ID as 0.
    The default log path is /tmp/kafka-logs.
  • Edit the default log path (/tmp/kafka-logs) for starting the 1st broker in the server.properties file available confluent base path.
    vi etc/kafka/server.properties.

select

  • Start the broker using the following command:
 

You can view the 1st broker startup with broker ID 0 and port 9092:

select

  • Start 2nd broker in the cluster by copying server.properties as server1.properties under etc/kafka/ for configuring 2nd broker in cluster.
  • Edit server1.properties.
    vi etc/kafka/server1.properties.

select

  • Start the broker using the following command:
You can view the 2nd broker startup with broker ID 1 and port 9093:

select

  • List the brokers available in the cluster using the following command:
You can view the brokers available in the cluster as shown in the below diagram:

select

In the above case, two brokers are started on the same node. If the brokers are available in different nodes, parallel message processing can be made faster and memory issue can be resolved when a large number of messages are produced by sharing the messages in different nodes memory.

Producing and Consuming Trip Details Using Custom Partitioning

To produce and consume trip details using custom partitioning, perform the following steps:

  • Create topic trip-data with two partitions using the following command:
select

  • Describe the topic to view the leaders of partitions created.

You can see broker 0 responsible for partition 0 and broker 1 responsible for partition 1 for message transfer as shown in the below diagram:

select

• Use custom partitioner technique to produce messages.
• Create CustomPartitioner class by overriding partitioner interface using the below commands:

You can view the Subscriber user type messages produced into partition 0 and Customer user type messages turned to partition 1.

  • Define the CustomPartitioner class in producer properties as shown below:
  • Define the partitions to the topic in the consumer by assigning different partitions to the consumers as shown below:
  • Pass the partition as input in arguments in the consumer when running multiple consumers with each consumer listening to different partitions.
  • Start multiple consumers with different partitions.
  • Start Consumer1 using the below command:
  • Start Consumer2 using the below command:
  • Produce the trip details by defining the custom partitioner using the below command:
You can view the consumer 1 consuming only Subscriber messages from Partition 0 and consumer 2 consuming only Customer messages from partition 1.

Consumer1

select

Consumer2

select

  • Check the memory of the brokers after consuming all the messages in both consumers.

The memory shared between the brokers and the memory of the brokers’ logs can be viewed in the below diagram:

select

Here, the Customer messages are consumed by broker localhosy:9092 and Subscriber messages are consumed by the broker localhost:9093. As the Customer messages are less, only less memory is occupied in kafka-logs (localhost:9092).

Creating Trip Data Stream

In KSQL, there is no option to consume the messages based on the partitions. The messages are consumed from all the partitions in the given topic for stream or table creation.

To create trip data stream, perform the following steps:

  • Separate the Subscriber and Customer data using conditions for Window processing.
  • Create trip_data_stream with columns in trip data produced using the following command:
  • Extract Unix TIMESTAMP for Windowing using the start times of trips.
  • Set the extracted start time Unix TIMESTAMP as property of stream for Windowing using the start times of trips instead of the message produced time.
  • Create the stream with extracted Unix TIMESTAMP and the subscriber messages for finding the trip details of the subscribers using the below command:

Performing Streaming Analytics Using Window Tumbling

Window tumbling groups the data in the given interval into non-overlapping, fixed-size Windows. It is used in anomaly detection of the stream on a certain time interval. For example, consider tumbling with a time interval of 5 minutes.

select

To find the number of trips started by subscribers at the interval of 5 minutes, execute the following command:

select

From the above result, it is evident that 19 trips have been started at the end of the 4th minute, 25 trips have been started at the end of the 9th minute, and 26 strips have been started at the end of the 14th minute. Thus, the started trips are counted at each given interval of time.

Performing Streaming Analytics Using Window Session

In Window session, data is grouped in a particular session. For example, when a session 1 minute is set and if data is not available in the interval of 1 minute, then a new session is started for grouping the data. For example, consider a session of 1 minute working as stated in the following diagram:

select

To group start the trip details of the subscribers in the particular session, set the session interval as 20 seconds using the below command:

select

From the above diagram, it is evident that the data grouping is made in the particular interval session. When the data is not available in the interval 20 second, then a new session is started for grouping the data.

For example, consider the time interval between 00:01:09 and 00:01:57. At an interval between 00:01:09 and 00:01:33, you can view no time difference of 20 second or more than that. So, trip counts are incremented. At an interval between 00:01:33 and 00:01:57, you can view an inactivity gap of more than 20 second. So, a new session is started from 57th second.

Performing Streaming Analytics Using Window Hopping

In Window hopping, data are grouped in a given time interval into overlapping Windows by advancing to the given interval of time. For example, consider interval 5 minute with an advanced interval of 1-minute working as shown in the below diagram:

select

To group start the trip details in the interval of 5 minutes advanced by 1 minute, execute the following command for hopping Window analysis:

select

From the above diagram, it is evident that 5 entries for each record are consumed in the interval of 5 minutes’ size and advanced by 1 minute. Entry size varies based on the interval size and advanced interval given.

In the above example, consider 00:02:12 time record scenario to check the working of the hopping with 5 minutes and advanced 1-minute size given. 00:02:12 scenario has five entries with trip counts 7,7,7,6,1. In 2 minutes, only two advances of 1 minute are made for first three entries. 00:00:00 to 00:02:12 time interval has 7 trips started. 4th entry made an advance of 1 minute. 00:01:00 to 00:02:12 time interval has 6 trips and 5th entry made another advance of 1-minute. So, trip considered from 00:02:00 to 00:02:12 has only 1 trip.

Conclusion

In this blog, we discussed custom partitioning technique to partition the trip details using user type in two different partitions. We also discussed Kafka Windowing concepts such as Window Tumbling, Window Session, and Window Hopping and its working using trips start timings to understand the difference between the types of windowing.

References

1323 Views 1 Views Today