Spark Structure Streaming
Spark Streaming is a process in Spark where we can process real-time data. Here, streaming means continuous; as soon as we receive the data, Spark Streaming will process this data. Streaming divides continuously flowing input data into discrete units for further processing. Spark Streaming is based on RDD and we need to write the code using RDD. But Spark Structure Streaming is based on dataframes and datasets and can write the code using DF or DS. We cannot use Spark SQL to write code in Spark Streaming, but we can use it in Spark Structure Streaming and benefit from the catalyst optimiser.
Spark Streaming is used to process real-time data. Let’s say we want to book the tickets. Then all the details should be updated in real time, i.e., available seats, reserved seats, flight status, etc. If we use a batch process to process this data, it will update all the details according to the batch job’s scheduled time, so if you book tickets today, it could take 2 days, 3 days, or months just to update the status in batch process. To handle these kinds of scenarios, we need a Spark Streaming process which can handle these cases in realtime and process the data within a few seconds.
Use cases of Spark Streaming.
1. E-commerce : To track the order status in real time/to order products etc.
2. Transport : To track the running status of a train/flight. To record the status, etc.
3. Finance: In all banks, all baking information should be reflected in real time.i.e., credit/debit , trading, etc.
4. Healthcare: To track the patient’s status in real time.
Stateless vs. Stateful Transformation
There are two kinds of transformation.
Stateless: This kind of transformation does not maintain the state of the RDD; it works on a single RDD and gives the result. Specifically, map(), reducebykey(), filter(), and sum().
These kinds of transformations maintain the status of RDD and work on historical data also. We can accumulate data from multiple RDDs. In stateful transformation, we have the choice to accumulate the data. Either we can accumulate all the rdd’s and can work on complete data, or we can define the time and accumulate a few rdd’s also.
Let’s say we are creating a new RD every 5 minutes.
So in one hour we will have 12 rdd. and if our application is running 24*7, then for one day we will have 12*24=284 rdd’s.
So here we have a choice to either accumulate all 284 rdd’s or we can accumulate the last 1 hr rdd’s, which will be 12.
Let’s discuss a few more concepts of spark structure streaming before jumping to examples.
Let’s say I want to calculate the sum of all the Rs for the last one hour. Then I need to take the sum of the last 12 rdd’s. But sum() is a stateless transformation, so once we do the sum of the first rdd, it will remove its information the moment we do the same for the second rdd. In such scenarios, we need to use stateful transformations that can take the sum and can maintain the state of the previous RDD. updateStateByKey is a stateful transformation which can be used in place of sum to get the sum. We will see it practically.
Now let’s discuss how to accumulate only a few RD’s based on time. i.e. the previous hour, two hours, etc. To do this, we need to use the concept of a window.
Let’s consider our batch interval is 10 sec. A new RDD will be created every 10 seconds.
Window size : 30 Sec : This will wait till the window size is 30 sec, so it will wait till 3 RD’s, then it will process the data.
size for one rdd=10 sec.
window size =30 sec (10+10+10) seconds.
sliding interval of 20 seconds : So here, after 20 seconds, 2 old RDs will be released and 2 new RDds will be added.
Two new RDDs have been added.Every 20 seconds is also called the “Summary function. And releasing the two oldest RDDs after every 20 seconds is called an Inverse Function.
An Example of Stateful Transformation using updateStateByKey Transformation.
val sc=new SparkContext(“local[*],”WordCountProgramStateful”)
val ssc=new StreamingContext(sc,Seconds(5))ssc.checkpoint(“/user/rahul/checkpoint/”)val lines=ssc.socketTextStream(“localhost”,9999)def updatevalues(newVal:Seq[Int],prevState:Option[Int]):Option[Int]={
val newCount=prevState.getOrElse(0)+newVal.sum
Some(newCount)
}
val words=lines.flatMap(x=>x.split(“ “)).map(x=>(x,1)).updateStateByKeyyKey().print()ssc.start()
ssc.awaitTermination()
In the above example, updateStateByKey will accumulate all the RD and do the processing.
Let’s try to modify the above example so it can work with a window also and we can accumulate RDD’s based on time.
val sc=new SparkContext(“local[*]”,”WordCountProgramStateful”)
val ssc=new StreamingContext(sc,Seconds(5))ssc.checkpoint(“/user/rahul/checkpoint/”)
val lines=ssc.socketTextStream(“localhost”,9999)val words=lines.flatMap(x=>x.split(“ “)).map(x=>(x,1)).reduceByKeyAndWindow((x,y)=>x+y,(x,y)=>x-y,Seconds(10),Seconds(2).print()
ssc.start()
reduceByKeyAndWindow: This transformation takes four parameters:
Summary : New RDD,What Comes In?
Inverse : Old RDD, which needs to be removed. What Goes Out?
Window size : Size for one Rdd
sliding Interval : Combination of one or more Window Size
Till now we have discussed Spark Streaming, which deals with RDD. Now let’s discuss Spark Structure Streaming, which deals with dataframes and datasets and can write the code in Spark SQL.
How To Read Data in Spark Structure Streaming from Kafka Source.
val kafkaDF = spark
.readStream
.format("kafka")
.load()
How Write Read Data in Spark Structure Streaming from Kafka Source.
kafkaDF
.writeStream
.format("csv")
.trigger(Trigger.ProcessingTime(5 seconds))
.option("checkpointLocation", )
.outputMode(OutputMode.Append)
.start()
.awaitTermination()}
Different Types of trigger Configuration:
unspecified: If we do not specify any trigger configuration here, then Spark will start the new batch once the current batch is finished. However, if the current batch is finished and there is still no new data, in that case, spark will wait for new data and trigger the next batch.
Time Interval (5 min):In this case, if one micro batch starts at 10:15:00 Pm and takes 15 sec. to complete and finishes at 10:15:15, then the spark will wait for another 4 min. 45 seconds and then at 10:20:00 a new batch will be triggered. Also, let’s consider if it took 6 min to finish and finished at 10:21:00, the spark will not start a new batch at 10:20:00. It will wait to finish one batch, and as soon as that batch is finished, the new batch will start.
One Time: It is a kind of batch processing in which we can trigger and process all the data at once. The difference between normal batch processing and one-time in spark streaming is that it will use all the functionality that spark streaming provides to maintain the state and will only process new data.
Continuous (Experimental): Spark Developers It will begin processing the data as soon as it enters the source.
Few Examples on Spark Structure Streaming:
Word Count Program: Source (Kafka):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder .appName(“WordCountUsingSpark”).getOrCreate()val inputSchema: StructType = new StructType().add(“ts”, TimestampType).add(“str”,StringType)val df: DataFrame = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”,kafkaBroker)
.option(“subscribe”,kafkaTopic)
.load()import spark.implicits._val data: Dataset[String] = df
.select(col(“value”).cast(“STRING”))
.select(from_json(col(“value”), inputSchema).as(“jsonConverted”))
.select(“jsonConverted.str”).as[String]val wordFrequencies: DataFrame = data
.groupBy(col(“value”)).count()
.toDF(“word”, “frequency”)val query: StreamingQuery = wordFrequencies
.orderBy(desc(“frequency”))
.writeStream
.format(“console”)
.outputMode(OutputMode.Complete())
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.ProcessingTime(5 seconds))
.start()
.awaitTermination()
2. Word Count (Source: Socket)
val df: DataFrame = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val words= df1.selectExpr(“explode(split(value,’ ‘)) as word”)
val wordcount=words.groupBy(“word”).count()wordcount.writeStream
.format(“console”)
.outputMode(OutputMode.Complete())
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.ProcessingTime(5 seconds))
.start().awaitTermination()
In the above examples, we saw how to read and write data from kafka and socket. We can also use a file as a data source for the spark structure streaming process, so whenever any new file arrives at the source, a new batch will be triggered.
Let’s look at an example of a file as a data source:
val spark = SparkSession
.builder()
.appName(applicationName) .config("spark.streaming.stopGracefullyOnShutDown",true)
.config(spark.sql.streaming.schemaInference",true)
.getOrCreate()val df= spark.readStream
.format(“json”)
.option("maxFilesPerTrigger",1)
.option(“path”,"/user/rahul/input/")
.load()df.createOrReplaceTempView("df")val df_fin=spark.sql("select * from df")df_fin.writeStream
.format(“json”)
.outputMode(OutputMode.append())
.option("checkpointLocation","checkpoint_path")
.option("path","OutputPath")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start().awaitTermination()
Let’s try to understand the important configuration which we used here.
1. .config (“spark.streaming.stopGracefullyOnShutDown,” “true”):This configuration will help us restart our application normally when it crashes or stops abnormally or in case of shutdown.
2. .config (spark.sql.streaming.schemaInference “true”):This configuration will take the schema from the json file.
3. .option (“maxFilesPerTrigger,”,1): This configuration will pick only one file in a TRIGGER, so it will avoid OOM issues in spark applications.
4. df.createOrReplaceTempView (“df”): This will generate a temp table from the dataframe, making our dataframe behave similarly to a temp table.
5. outputMode (OutputMode.append()): Adds data to the file.
6..option (“checkpointLocation”,”checkpoint_path”): A checkpoint saves the state of the Spark application and aids in fault tolerance.
7. Trigger (Trigger.ProcessingTime(“5 seconds”)):This option will create a new batch every 5 seconds.
Let’s Discuss Aggregation in Spark Structure Streaming.
There are 2 kinds of aggregation in Spark Structure Streaming.
Continuous Aggregation : Do Aggregation as soon as data comes in.
Time-based Aggregation(Window Aggregation) :we can specify the time, i.e., the last one month, last one day, etc., and Spark can clean up the unnecessary state information.
Window aggregate comes in two varieties:
Tumbling Time Window : Non-Overlapping
Sliding Time Window : Overlapping
Let’s Try to understand these Aggregation Practically.
(Tumbling Time Window : Non-Overlapping)
val spark = SparkSession
.builder()
.appName(applicationName) .config("spark.streaming.stopGracefullyOnShutDown",true)
.config(spark.sql.streaming.schemaInference",true)
.getOrCreate()val df_schema=StructType(List(
StructField("Date",TimestampType)
StructField("Name",StringType)
StructField("collection",StringType)
))val df = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val df_1=df.select(from_json(col("value"),df_schema).alias("value"))
val df_2=df_1.select("value.*")
val df_3=df_2.groupBy(window(col("Date"),"15 minute"))
.agg(sum("collection")
.alias("totalcollection")val output=df_3.select("window.start","window.end","collection")
output.writeStream
.format("console")
.outputMode(OutputMode.append())
.option("checkpointLocation","checkpoint_path")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start().awaitTermination()
Watermark:
In streaming applications, there might be some scenarios where we need to deal with late-arriving records. Let’s say we should have received records at 11:15 am but due to some issue, it was received late at 12:15, so we need to handle these late coming records also . Spark structure streaming supports watermarking to handle these late-arriving records.We can use watermark duration here and if records come after the watermark boundary, we can safely discard those records.
Let’s see one example of a watermark.
Watermark boundry=max(event_time)-watermark
val df_schema=StructType(List(
StructField("Date_1",TimestampType)
StructField("Name",StringType)
StructField("collection",StringType)
))val df = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val df_1=df.select(from_json(col("value"),df_schema).alias("value"))
val df_2=df_1.select("value.*")
val df_3=df_2.withWatermark("Date_1","30 minute")
.groupBy(window(col("Date_1"),"15 minute"))
.agg(sum("collection")
.alias("totalcollection"))val output=df_3.select("window.start","window.end","collection")
output.writeStream
.format("console")
.outputMode(OutputMode.append())
.option("checkpointLocation","checkpoint_path")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start().awaitTermination()
Now let’s discuss our one more aggregation type, which is Sliding Time Window : (Overlapping)
The only difference between Tumbling Time Window and Sliding Time Window is that we need to pass one more value through the sliding window interval.
val df_schema=StructType(List(
StructField("Date_1",TimestampType)
StructField("Name",StringType)
StructField("collection",StringType)
))val df = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val df_1=df.select(from_json(col("value"),df_schema).alias("value"))
val df_2=df_1.select("value.*")
val df_3=df_2.withWatermark("Date_1","30 minute")
.groupBy(window(col("Date_1"),"15 minute",""5 minute""))
.agg(sum("collection")
.alias("totalcollection"))val output=df_3.select("window.start","window.end","collection")
output.writeStream
.format("console")
.outputMode(OutputMode.append())
.option("checkpointLocation","checkpoint_path")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start().awaitTermination()
This will Create above overlapping batches.
This will create the above overlapping batches.
Streaming Joins:
Join Types in Spark Streaming:
- Join between static and streaming dataframes.
- Join between two streaming dataframes.
# Join Between Static and streaming Dataframes.val df_schema=StructType(List(
StructField("Date_1",TimestampType)
StructField("id",StringType)
StructField("collection",StringType)
))val df = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val df_1=df.select(from_json(col("value"),df_schema).alias("value"))
val df_2=df_1.select("value.*")val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "id")val joinexpr=df_2.col("id")===staticDf.col("id")val jointype="inner"
val final_df=df_2.join(staticDf, joinexpr, jointype).drop(staticDf.col("id"))val fin1=final_df.writeStream
.format("console")
.outputMode(OutputMode.update())
.option("checkpointLocation","checkpoint_path")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()fin1.awaitTermination()
Note: We can just change the jointype and can apply “leftouter” and “rightouter” join.
# Join Between streaming Dataframes.val df_schema1=StructType(List(
StructField("Date_1",TimestampType)
StructField("id",StringType)
StructField("collection",StringType)
))val df_schema2=StructType(List(
StructField("Date_2",TimestampType)
StructField("id",StringType)
StructField("collection_2",StringType)
))val df_s1 = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val df_s2 = spark.readStream
.format(“socket”)
.option(“host”,sockethost)
.option(“port”,portno)
.load()val df_1=df_s1.select(from_json(col("value"),df_schema).alias("value"))
val df_1_1=df_1.select("value.*").withWatermark("Date_1","30 minute")val df_2=df_s2.select(from_json(col("value"),df_schema).alias("value"))
val df_2_1=df_2.select("value.*").withWatermark("Date_2","30 minute")val jointype="inner"
val final_df=df_1_1.join(df_2_1, joinexpr, jointype).drop(df_1_1.col("id"))val fin1=final_df.writeStream
.format("console")
.outputMode(OutputMode.update())
.option("checkpointLocation","checkpoint_path")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()fin1.awaitTermination()# Example of stream of DataFrames with a static DataFrames.val streamingDf = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")//Inner Join
streamingDf.join(staticDf, "Timestamp")
line.join(staticDf, "Timestamp")//Left Join
streamingDf.join(staticDf, "Timestamp", "left_join")
line.join(staticDf, "Timestamp", "left_join")
Streaming Deduplication: we can deduplicate records in data streams using a unique identifier in the events. This is exactly the same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
val streamingDf = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "structured_topic")
.load()// Without watermark using id column
streamingDf.dropDuplicates("id")
// With watermark using id and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("id", "DOB")
Few More Important Questions Related to Spark Structure Streaming:
How the Spark Structure Streaming Application Handles Fault Tolerance
Spark Structure Streaming uses the concept of a checkpoint for fault tolerance and to recover lost data.I have written complete blog to recover lost data using Spark Checkpoint here. Spark-CheckPoint
What are the different output modes that are available in Spark Structure Streaming?
Append mode: this is the default mode. The new rows are written on the sink.
Complete mode: it writes all the rows. It is supported just by groupBy or groupByKey aggregations.
Update mode: writes in the sink “only” all the rows that are updated. When there are no aggregations, it works exactly the same as in “append” mode.
It is an optimiser that automatically finds the most efficient plan to execute data operations specified in the user’s program. It optimises Spark SQL Query to the greatest extent possible, requiring no additional optimisation at the query level, and it reduces execution time and improves Spark SQL Dataframe performance.
These include constant folding, predicate pushdown, projection pruning, and null propagation.
Spark SQL takes a logical plan and generates one or more physical plans using physical operators that match the Spark execution engine. It then selects a plan using a cost model.
Code optimisation: The final phase of query optimisation involves generating Java bytecode to run on each machine. Because Spark SQL often operates on in-memory , where processing is CPU-bound, we wanted to support code generation to speed up execution. So, it helps to move from an unoptimised logical query plan to an optimised physical plan, achieving that in the below steps:
Note: Cross Join is not supported in Spark Structure Streaming.
Spark Structure Streaming itself is a very vast concept; in this article, we learned about all the important concepts of spark streaming. The key takeaways from the article are that
-We learned What is spark streaming and spark structure streaming?
-Summary Function and Inverse Function
-How to Read Data from Different Sources.
-How to do transformation of spark streaming and spark structure streaming.
-How to Write Data in Different Places
-How to use watermarks.
-Spark Streaming Joins.
Different Types of Aggregations
-Stateless and stateful transformation.
-Discussed different types of output modes.
-Discussed Coding Part Related to Spark Streaming
-Discussed How to Handle Fault Tolerance in Spark Structure Streaming.
-Different types of trigger configuration.
Different Types of Configuration related to Spark Structure Streaming
Discussion The process of duplicating records with and without a watermark
-Discussed Watermark Boundary value.
-Discussed about the catalyst optimiser.
So, this was all about Spark Structure Streaming to get you started with your Spark Streaming Journey . I hope you liked the article.
Important Links For Data Engineers:
1. EveryThing You Need To Need About Scala.
2. Top 20 Scala Programming Question For Interview.
3. Spark Dataframes For Beginner.
4. A Complete Guide On Spark Structure Streaming.
5. A Complete Guide On Apache Hive Basics.
6. A Complete Guide On Apache Hive Advance.
7. Hadoop Components Installation Guide In MacOs
8. Slowly Changing Dimension In Hive — DataWareHouse.
9. A Comprehensive Guide On Apache Sqoop
10. Linux Commands You’ll Actually Need for Your Data-Engineering Journey.
11. Apache Spark Checkpoints: Let’s Recover Lost Data From Offset Values.
Happy Learning!!
Note: To discover the best big-data blogs, visit Feedspot. This is the best Big Data blog list curated from thousands of blogs on the web and ranked by traffic, social media followers, domain authority, and freshness.