What Is Kafka?
Kafka is a powerful system that handles streaming data in real-time. It is like a highly reliable and scalable infrastructure that allows you to process and analyze large amounts of data as it flows through the system. With Kafka, you can build applications that handle data ingestion, analytics, and other real-time data processing tasks effectively.
What Is KSQL?
KSQL is a tool that allows you to use SQL-like queries to process and analyze streaming data in real-time using Apache Kafka. It simplifies the development of real-time applications by enabling you to write SQL queries to filter, transform, and join data streams, making it easier to work with and derive insights from streaming data.
Why Do We Need Integration Between Kafka and Ksql?
- Real-time data processing: Enables the processing and analysis of streaming data in real-time.
- SQL-like interface: Allows working with streaming data using familiar SQL queries.
- Real-time analytics: Provides insights and analytics on continuously flowing data.
- Simplified development: Streamlines the development of real-time applications with a higher-level, declarative approach.
- Scalability and fault-tolerance: Supports handling high-volume streaming data with scalability and fault-tolerant processing.
Key Components In Ksql :
- Stream: Represents continuously flowing real-time data for processing and analysis.
- Table: Organized collection of data records that result from aggregating or joining streams or tables. Maintains the latest values for each key.
- Queryable Table: A special table type allowing low-latency access to the current state for interactive querying.
Pre-Requites :
Start Zookeeper
start kafka server
start ksql server
nohup ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties > zookeeper.log 2>&1 &
nohup ./bin/kafka-server-start ./etc/kafka/server.properties > kafka.log 2>&1 &
nohup ./bin/ksql-server-start ./etc/ksql/ksql-server.properties > ksql.log 2>&1 &
How to Integrate Kafka with Ksql:
Scenario 1: Data with No-key
- Prepare Data:
cat transactionNokey.json{"transactionId" : "101", "paymentMethod": "UPI", "country": "IN","transactionDate": "2023-07-13"}
{"transactionId" : "102", "paymentMethod": "debit-card", "country": "US","transactionDate": "2023-07-14"}
{"transactionId" : "103", "paymentMethod": "credit-card", "country": "SG","transactionDate": "2023-07-15"}
{"transactionId" : "104", "paymentMethod": "pay-later", "country": "IN","transactionDate": "2023-07-12"}
{"transactionId" : "105", "paymentMethod": "NetBanking", "country": "AUS","transactionDate": "2023-07-11"}
{"transactionId" : "106", "paymentMethod": "QR", "country": "CA","transactionDate": "2023-07-10"}
2. Create and Produce data in Kafka from the transactionNokey.json file.
./bin/kafka-topics --create --topic transactionNokey --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
./bin/kafka-console-producer --bootstrap-server localhost:9092 --topic transactionNokey < transactionNokey.json
./bin/kafka-console-consumer --topic transactionNokey --bootstrap-server localhost:9092 --from-beginning
3. Start Ksqldb.
./bin/ksqlksql> SET 'auto.offset.reset' = 'earliest';
ksql> print 'transactionNokey' from beginning;
rowtime: 2023/07/13 18:57:29.676 Z, key: , value: {"transactionId" : "101", "paymentMethod": "UPI", "country": "IN","transactionDate": "2023-07-13"}, partition: 0
rowtime: 2023/07/13 18:57:29.685 Z, key: , value: {"transactionId" : "102", "paymentMethod": "debit-card", "country": "US","transactionDate": "2023-07-14"}, partition: 0
rowtime: 2023/07/13 18:57:29.685 Z, key: , value: {"transactionId" : "103", "paymentMethod": "credit-card", "country": "SG","transactionDate": "2023-07-15"}, partition: 0
rowtime: 2023/07/13 18:57:29.685 Z, key: , value: {"transactionId" : "104", "paymentMethod": "pay-later", "country": "IN","transactionDate": "2023-07-12"}, partition: 0
rowtime: 2023/07/13 18:57:29.685 Z, key: , value: {"transactionId" : "105", "paymentMethod": "NetBanking", "country": "AUS","transactionDate": "2023-07-11"}, partition: 0
rowtime: 2023/07/13 18:57:29.685 Z, key: , value: {"transactionId" : "106", "paymentMethod": "QR", "country": "CA","transactionDate": "2023-07-10"}, partition: 0
We can see here that the ‘key’ is null, so the table will not show data as the primary key is null for the table.
4. Create tables, streams, and Queryables.
ksql> CREATE STREAM transactionNokeyStream (transactionId VARCHAR KEY ,paymentMethod VARCHAR,country VARCHAR,transactionDate VARCHAR) WITH (KAFKA_TOPIC='transactionNokey',VALUE_FORMAT='JSON');
ksql> CREATE table transactionNokeyTable (transactionId VARCHAR PRIMARY KEY ,paymentMethod VARCHAR,country VARCHAR,transactionDate VARCHAR) WITH (KAFKA_TOPIC='transactionNokey',VALUE_FORMAT='JSON');
ksql> CREATE TABLE transactionNokeytableQueryable AS SELECT * FROM transactionNokeyTable;
ksql> select * from transactionNokeyStream;
ksql> select * from transactionNokeytableQueryable;
Scenario 2: Data with Key
- Prepare Data
cat transactionWithkey.json
101:{"transactionId" : "101", "paymentMethod": "UPI", "country": "IN","transactionDate": "2023-07-13"}
102:{"transactionId" : "102", "paymentMethod": "debit-card", "country": "US","transactionDate": "2023-07-14"}
103:{"transactionId" : "103", "paymentMethod": "credit-card", "country": "SG","transactionDate": "2023-07-15"}
104:{"transactionId" : "104", "paymentMethod": "pay-later", "country": "IN","transactionDate": "2023-07-12"}
105:{"transactionId" : "105", "paymentMethod": "NetBanking", "country": "AUS","transactionDate": "2023-07-11"}
106:{"transactionId" : "106", "paymentMethod": "QR", "country": "CA","transactionDate": "2023-07-10"}
2. Create and Produce data in Kafka from the transactionWithkey.json file.
./bin/kafka-topics --create --topic transactionsWithkey --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
./bin/kafka-console-producer --bootstrap-server localhost:9092 --topic transactionsWithkey --property "parse.key=true" --property "key.separator=:" < transactionWithkey.json
./bin/kafka-console-consumer --topic transactionsWithkey --bootstrap-server localhost:9092 --from-beginning
3. Start Ksql.
./bin/ksqlksql> SET 'auto.offset.reset' = 'earliest';
ksql> print 'transactionsWithkey' from beginning;
rowtime: 2023/07/13 19:12:58.008 Z, key: 101, value: {"transactionId" : "101", "paymentMethod": "UPI", "country": "IN","transactionDate": "2023-07-13"}, partition: 0
rowtime: 2023/07/13 19:12:58.016 Z, key: 102, value: {"transactionId" : "102", "paymentMethod": "debit-card", "country": "US","transactionDate": "2023-07-14"}, partition: 0
rowtime: 2023/07/13 19:12:58.017 Z, key: 103, value: {"transactionId" : "103", "paymentMethod": "credit-card", "country": "SG","transactionDate": "2023-07-15"}, partition: 0
rowtime: 2023/07/13 19:12:58.017 Z, key: 104, value: {"transactionId" : "104", "paymentMethod": "pay-later", "country": "IN","transactionDate": "2023-07-12"}, partition: 0
rowtime: 2023/07/13 19:12:58.017 Z, key: 105, value: {"transactionId" : "105", "paymentMethod": "NetBanking", "country": "AUS","transactionDate": "2023-07-11"}, partition: 0
rowtime: 2023/07/13 19:12:58.017 Z, key: 106, value: {"transactionId" : "106", "paymentMethod": "QR", "country": "CA","transactionDate": "2023-07-10"}, partition: 0
We can see here that the ‘key’ is not null, so the table will show data as the primary key is not null for table PFB.
4. Create tables, streams, and Queryables.
ksql> CREATE STREAM transactionWithkeyStream (transactionId VARCHAR KEY ,paymentMethod VARCHAR,country VARCHAR,transactionDate VARCHAR) WITH (KAFKA_TOPIC='transactionsWithkey',VALUE_FORMAT='JSON');
ksql> CREATE table transactionWithkeyTable (transactionId VARCHAR PRIMARY KEY ,paymentMethod VARCHAR,country VARCHAR,transactionDate VARCHAR) WITH (KAFKA_TOPIC='transactionsWithkey',VALUE_FORMAT='JSON');
ksql> CREATE TABLE transactionWithkeytableQueryable AS SELECT * FROM transactionWithkeyTable;
ksql> select * from transactionWithkeyStream;
ksql> select * from transactionWithkeytableQueryable;
Note: If you want to see only the latest data in the stream in real time, then set the offset to latest. and add EMIT CHANGES to the query
ksql> SET 'auto.offset.reset'='latest';
Successfully changed local property 'auto.offset.reset' from 'latest' to 'latest'.
ksql> select * from TRANSACTIONSWITHKEYTESTSTREAM EMIT CHANGES;
A major benefit of KSQL is that it allows you to work with both streaming data (streams) and structured data (tables). With KSQL, you can create tables that store data from Kafka topics, similar to traditional databases. This enables you to query and perform joins on the streaming data as if it were stored in a regular database, providing flexibility and familiarity in working with real-time data.