Introduction
Starting with the fundamentals: What is a data stream, also referred to as an event stream or streaming data? At its heart, a data stream is a conceptual framework representing a dataset that is perpetually open-ended and expanding. Its unbounded nature comes from the constant influx of new data over time. This approach is widely recognized and employed by leading technology firms such as Google and Amazon.
The essence of this model is a relentless sequence of events, which can capture virtually any type of business activity for scrutiny. This includes everything from tracking sequences of credit card transactions, stock market activities, and logistics updates, to monitoring internet traffic, factory sensor data, email interactions, and the unfolding of a game. Essentially, any sequence of events can be analyzed within this framework. The scope for analysis is enormous, allowing for the dissection of nearly all processes as sequences of events.
Learning Objectives
- Understand the Concept of Stream Processing.
- Distinguish Between Processing Paradigms.
- Familiarize with Stream Processing Concepts.
- Explore Stream-Processing Design Patterns.
- Learn about Stream Processing Applications and Framework Selection.
Attributes of Event Streams Model
- Event streams are ordered: Events are always considered ordered as one event occurs after another event is finished. This is one of the differences between an event stream and a database table—records in a table are always considered unordered and the “order by” clause of SQL is not part of the relational model; it was added to assist in reporting.
- Immutable data records: Events, once they occur, can never be modified. Instead, an additional event is written to the stream, recording a cancellation of a previous transaction.
- Event streams are replayable: This property is valuable because it enables businesses to replay historical streams of events, essential for correcting errors, exploring new analysis methods, or conducting audits. This capability, exemplified by Kafka, is key to stream processing’s success in modern businesses, transforming it from a mere experimental tool to a critical business function.
Now that we know what event streams are, it’s time to make sure we understand Stream processing. Stream processing refers to the ongoing processing of one or more event streams.
Distinguish Between Processing Paradigms
Request-Response
This is a communication paradigm where a client sends a request to a server and waits for a response. This interaction pattern is synchronous, meaning the client initiates the transaction and waits until it receives a reply before proceeding.
Batch Processing
Batch processing involves executing a series of jobs or tasks on a collection of data at once. The data is collected over a period, and the processing is typically scheduled to run during off-peak hours. This paradigm is asynchronous, as the processing doesn’t happen in real-time but at a scheduled time or when a certain amount of data accumulates.
Stream Processing
Stream processing is designed to handle continuous flows of data in real-time. Data is processed sequentially and incrementally, enabling immediate analysis and action on streaming data.
Time Windows
In stream processing, most operations involve windowed computations. For instance, in calculating moving averages, key considerations include:
- Window Size: This determines the duration over which to average events, such as in every 5-minute, 15-minute, or daily window. Larger windows yield smoother averages but react slower to changes. For example, a spike in price will be reflected more promptly in a smaller window.
- Advance Interval: This specifies how frequently the window updates, which can be every minute, second, or upon each new event. An advance interval matching the window size defines a “tumbling window,” where each window is discrete and non-overlapping. Conversely, a “sliding window” updates with every new record, allowing for overlap in the data considered across windows.
- Window Updatability: It’s crucial to determine how long a window can incorporate delayed events. Setting a timeframe allows for late-arriving data to be added to their appropriate window, necessitating recalculations to keep the results current. For instance, allowing up to four hours for late events can ensure the accuracy of the computed averages despite delays.
Stream-processing Design Patterns
Every stream-processing system is different—from the basic combination of a consumer, processing logic, and producer to involved clusters like Spark Streaming with its machine learning libraries, and much in between.
Single-event processing
The most fundamental stream processing model deals with each event on its own, following the map/filter pattern. This method primarily focuses on discarding non-essential events or individually transforming each event. In this scenario, a stream-processing application ingests events, alters them according to specified criteria, and then emits them to a different stream.
Typical applications include sorting log messages into streams of varying priorities or changing the format of events (for example, from JSON to Avro). This model’s advantage is its simplicity in managing events without interdependencies, which simplifies failure recovery and load distribution. Since there’s no intricate state to reconstruct, processing can be seamlessly transferred to another instance of the application.
Processing with the Local State
Applications dedicated to stream processing typically engage in data aggregation tasks, including computing daily lows and highs for stock prices as well as moving averages. To perform these functions, the application must preserve a state that allows it to monitor and periodically update these cumulative figures.
Aggregations can be managed using individual states for distinct categories (such as individual stock symbols), rather than a unified state that spans the whole market. This approach leverages the use of a Kafka partitioner to route events sharing a common identifier (e.g., a stock symbol) to the same partition. As a result, each instance of the application deals with events from certain partitions, handling the state for its particular group of identifiers. This method ensures that the process of aggregation is both efficient and precise.
Multiphase Processing/Repartitioning
When aggregations demand a holistic analysis, such as pinpointing the daily top 10 stocks, a bifurcated strategy is essential. In the first phase, individual application instances compute the daily gains or losses for stocks using their local states and then relay these findings to a newly created topic with a single partition. This setup enables one instance to collate these summaries to determine the leading stocks.
The newly established topic, dedicated to daily summaries, experiences significantly less activity, making it feasible for a single instance to manage, thus facilitating the execution of more intricate aggregation processes as required.
Processing with External Lookup: Stream-Table Join
Sometimes stream processing requires integration with data external to the stream— validating transactions against a set of rules stored in a database, or enriching clickstream information with data about the users who clicked.
Streaming Join
Merging two live event streams requires combining their complete histories to align events sharing a common key within designated time frames. This is distinct from stream-table joins, where the focus is solely on the most current state. For example, to ascertain the popularity of search results, streams of search queries and corresponding clicks can be joined, correlating them by search term shortly after each query. This method, termed a windowed join, facilitates the real-time analysis of connections between simultaneous events in active streams.
Exploring Kafka Streams: Stock Market Statistics
Kafka Installation
https://kafka.apache.org/downloads
Move the extracted files to your desired location (C Drive).
Configure Zoopkeeper and Start Kafka Server
Open CMD move to the bin/windows folder in Kafka files and Use the Command – zookeeper-server-start.bat ..\..\config\zookeeper.properties
To start Kafka Server in a new CMD window from Kafka directory:
kafka-server-start.bat ..\..\config\server.properties
Set Up Kafka Topics and run Producer
Open another CMD in the Windows folder in the Kafka directory to create topics for Kafka Streaming.
kafka-topics.bat –create –topic conn-events –bootstrap-server localhost:9092 –replication-factor 1 –partitions 3
After creating the data you can start the producer in the same Command line.
kafka-console-producer.bat –broker-list localhost:9092 –topic conn-events
Install Python Libraries
Install Python libraries for Kafka integration, such as confluent-kafka-python or kafka-python, depending on your preference.
Producer Application
Write a Python producer application to fetch real-time stock data from a data provider (e.g., an API) and publish it to the Kafka topics. You can use libraries like requests to fetch data.
from confluent_kafka import Producer
import yfinance as yf
import time
import requests
import json
# Set headers for HTTP requests, including user agent and authorization token
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 \
(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Content-Type': 'application/json',
'Authorization': 'Bearer ' # Replace with your actual token
}
# Configuration for Kafka producer
conf = {
'bootstrap.servers': 'localhost:9092', # Kafka broker address (update as needed)
'client.id': 'stock-price-producer' # Identifier for this producer
}
# Creating a Kafka producer instance with the specified configuration
producer = Producer(conf)
# The Kafka topic to which the stock price data will be sent
topic="conn-events" # Update with your actual topic
# The ticker symbol for the stock to track (example given is Bitcoin USD)
ticker_symbol="BTC-USD"
def fetch_and_send_stock_price():
while True: # Loop indefinitely to continuously fetch and send stock prices
try:
# URL for fetching stock price data (example uses Bitcoin)
url="https://query2.finance.yahoo.com/v8/finance/chart/btc-usd"
response = requests.get(url, headers=headers) # Send the HTTP request
data = json.loads(response.text) # Parse the JSON response
# Extract the current market price from the response data
price = data["chart"]["result"][0]["meta"]["regularMarketPrice"]
# Send the fetched stock price to the specified Kafka topic
producer.produce(topic, key=ticker_symbol, value=str(price))
producer.flush() # Ensure all messages are sent to Kafka
# Log the action of sending stock price to Kafka
print(f"Sent {ticker_symbol} price to Kafka: {price}")
except Exception as e:
# Log any errors encountered during fetching or sending the stock price
print(f"Error fetching/sending stock price: {e}")
# Wait for a specified interval (30 seconds in this case) before the next fetch
time.sleep(30)
# Begin the process of fetching and sending stock price data
fetch_and_send_stock_price()
from confluent_kafka import Producer
import yfinance as yf
import time
import requests
import json
# Kafka producer configuration
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36
(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Content-Type': 'application/json',
'Authorization': 'Bearer '
}
conf = {
'bootstrap.servers': 'localhost:9092', # Replace with your Kafka broker address
'client.id': 'stock-price-producer'
}
# Create a Kafka producer instance
producer = Producer(conf)
# Kafka topic to send stock price data
topic = ‘conn-events’
# Ticker symbol of the stock (e.g., Apple Inc.)
ticker_symbol="BTC-USD"
def fetch_and_send_stock_price():
while True:
try:
url="https://query2.finance.yahoo.com/v8/finance/chart/btc-usd"
response = requests.get(url, headers=headers)
data = json.loads(response.text)
price = data["chart"]["result"][0]["meta"]["regularMarketPrice"]
#Function to fetch stock price and send to Kafka
# Produce the stock price to the Kafka topic
producer.produce(topic, key=ticker_symbol, value=str(price))
producer.flush()
print(f"Sent {ticker_symbol} price to Kafka: {price}")
except Exception as e:
print(f"Error fetching/sending stock price: {e}")
# Sleep for a specified interval (e.g., 5 seconds) before fetching the next price
time.sleep(30)
# Start sending stock price data
fetch_and_send_stock_price()
The price must now be visible on the consumer’s console too.
Consumer Application
Write a Python consumer application to store data in csv.
from confluent_kafka import Consumer, KafkaError
import csv
import os
from datetime import datetime
#tettt
# Define the CSV file name
csv_file = “data.csv”
# Kafka consumer configuration
conf = {
'bootstrap.servers': 'localhost:9092', # Replace with your Kafka broker address
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
# Create a Kafka consumer instance
consumer = Consumer(conf)
# Subscribe to a Kafka topic
topic="conn-events" # Replace with your topic name
consumer.subscribe([topic])
# Consume messages
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(‘Reached end of partition’)
else:
print(f’Error: {msg.error()}’)
else:
print(f’received {msg.value().decode(“utf-8”)}’)
data_price = [(datetime.now().strftime(“%Y-%m-%d %H:%M:%S”), msg.value().decode(“utf-8”))]
if not os.path.exists(csv_file):
# If it doesn’t exist, create a new file and write header and data
with open(csv_file, mode=”w”, newline=””) as file:
writer = csv.writer(file)
writer.writerow([“time”, “price”]) # Header
writer.writerows(data_price)
else:
# If it exists, open it in append mode and add data
with open(csv_file, mode=”a”, newline=””) as file:
writer = csv.writer(file)
writer.writerows(data_price)#import csv
File ‘data.csv’ is created or updated by above scripts
Kafka Streams: Architecture Overview
The examples in the previous section demonstrated how to use the Kafka Streams API to implement a few well-known stream-processing design patterns. But to understand better how Kafka’s Streams library actually works and scales, we need to peek under the covers and understand some of the design principles behind the API.
Building a Topology
Every stream application implements and executes at least one topology. Topology (also called DAG in other stream-processing frameworks) is a set of operations and transitions that every event moves through from input to Output. Even a simple app has a nontrivial topology. The topology is made up of processors— those are the nodes in the topology graph
Scaling the Topolgy
Kafka Streams enhances scalability by supporting multi-threaded execution within a single application instance and facilitating load balancing across numerous instances. This dual approach ensures that the application can perform optimally on a solitary machine leveraging multiple threads or be distributed across multiple machines, where threads work in unison to handle the workload.The mechanism for scaling divides the processing workload into discrete tasks, each correlated with a set of topic partitions. Each task is responsible for consuming and processing events from its designated partitions in a sequential manner and then producing the outcomes. Operating autonomously, these tasks embody the core elements of parallelism within Kafka Streams, enabling the system to process data more efficiently and effectively.
Surviving failures
The architecture that facilitates the scaling of applications within Kafka Streams also underpins robust failure management mechanisms. Kafka’s inherent high availability feature ensures that data is always accessible, enabling applications to resume operations from the last committed offset in the event of a failure. Should a local state store become compromised, it can be reconstructed using Kafka’s change log, preserving the integrity and continuity of data processing.
Kafka Streams enhances resilience by reallocating tasks from failed threads or instances to those that are operational, akin to the redistribution of work among consumers in a consumer group when a consumer fails. This dynamic redistribution helps maintain uninterrupted task execution, ensuring that the processing workload is efficiently managed across the available resources, thus mitigating the impact of failures and maintaining the application’s overall performance and reliability.
Stream Processing Use Cases
Stream processing offers immediate event handling, ideal for scenarios not requiring millisecond-fast responses but quicker than batch processing. Key use cases include:
- Customer Service: Enhances customer experience by updating hotel reservations, confirmations, and details in real-time, ensuring prompt customer service.
- Internet of Things (IoT): Applies real-time data analysis to devices for preventive maintenance, detecting when hardware maintenance is needed across industries.
- Fraud Detection: Identifies real-time irregular patterns to prevent fraud in credit cards, stock trading, and cybersecurity using large-scale event streams.
- Cybersecurity: Uses stream processing to detect unusual network activities like internal malware communications (beaconing), responding to threats promptly.
How to Choose a Stream-Processing Framework
When selecting a stream-processing framework, consider the application type you intend to develop, as each type demands specific features:
- Data Ingestion: Suitable for transferring and slightly modifying data to fit the target system’s requirements.
- Low Millisecond Actions: Ideal for applications needing rapid responses, such as certain fraud detection scenarios.
- Asynchronous Microservices: Best for services executing simple actions within a broader business process, potentially involving local state caching to enhance performance.
- Near Real-Time Data Analytics: Optimal for applications that require complex data aggregation and joining to produce actionable business insights quickly.
Conclusion
This article commences with an exploration of stream processing, offering a precise definition and elucidating its distinct features compared to other programming paradigms. We navigated through fundamental stream processing concepts, exemplified by three hands-on applications crafted with Kafka Streams. After thoroughly examining these examples, we provided a glimpse into the Kafka Streams architecture, revealing its operational mechanics. Concluding the article, we presented diverse applications of stream processing and furnished tips for assessing various stream-processing frameworks.
Frequently Asked Questions
A. A data stream is unbounded because it represents an infinite and ever-growing dataset with new records continuously arriving over time.
A. Stream processing is a non-blocking, ongoing processing of event streams that fills the gap between the high-latency/high-throughput batch processing and the low-latency, consistent response times of request-response models.
A. Event streams exhibit an unbounded nature, an ordered sequence of events, the immutability of data records, and the ability to replay historical events.
A. Stream processing distinguishes between event time (when an event occurs), log append time (when an event is stored in the Kafka broker), and processing time (when the event is processed by an application), emphasizing the significance of event time in data processing.
A. Stream processing finds applications across domains such as customer service for real-time updates, IoT for preventive maintenance, fraud detection in finance and cybersecurity, and identifying unusual network activities.