January 23, 2025
Top Spark Programming Question For Interview | by Rahul Patidar

Top Spark Programming Question For Interview | by Rahul Patidar

Introduction:

Hello everyone, Spark programming questions are crucial for preparing for big data interviews. In today’s post, I’ll cover important Spark programming questions that can help you get ready for your big data interview.

  1. Write a Spark program in Scala that takes a DataFrame with bill IDs and order names (as lists of items) and returns the count of each distinct item across all orders.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Order Count").getOrCreate()

import spark.implicits._

val data = Seq(
(101, "[pizza,samosa,idli]"),
(102, "[kachori,sambhar,idli]"),
(103, "[dosa,vada,pizza]"),
(104, "[samosa,idli,chai]"),
(105, "[pizza,chai,dosa]")
)

val df = data.toDF("billid", "Ordername")

val dfCleaned = df.withColumn("Ordername", regexp_replace(col("Ordername"), "\\[|\\]", ""))
val dfSplit = dfCleaned.withColumn("Ordername", split(col("Ordername"), ","))
val dfExploded = dfSplit.withColumn("Item", explode(col("Ordername"))).select("Item")
val dfCount = dfExploded.groupBy("Item").count()

dfCount.show()

2. Write a Spark program in Scala with columns id, name, and salary, where the data might contain special characters, blanks, and “NA” values that should be treated as nulls. Handle the uncertainty of column names being dynamic and uncertain due to potential special characters.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Handle Special Characters and Nulls").getOrCreate()

import spark.implicits._

val data = Seq(
(1, "#", ""),
(2, "dd", "$"),
(3, "NA", "5000"),
(4, "John", "NA"),
(5, "", "6000")
)

val df = data.toDF("id", "name", "salary")

def cleanColumns(df: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = {
val specialChars = List("#", "$", "%", "&", "@")

val cleanedDf = df.columns.foldLeft(df)((currentDf, colName) => {
currentDf.withColumn(colName,
when(
col(colName).isin(specialChars: _*) || col(colName).isNull || col(colName).equalTo("NA") || col(colName).equalTo(""),
lit(null)
).otherwise(col(colName))
)
})

cleanedDf
}

val cleanedDf = cleanColumns(df)
cleanedDf.show()

3. Input:
+ — — — — — — — — — -+ — — — — — — +
|timestamp |userid|
+ — — — — — — — — — -+ — — — — — — -+
|2018–01–01T11:00:00Z| u1
|2018–01–01T12:00:00Z| u1
|2018–01–01T11:00:00Z| u2
|2018–01–02T11:00:00Z| u2
|2018–01–01T12:15:00Z| u1
+ — — — — — — — — — -+ — — — +
Problem Statement:
Given a timeseries clickstream hit data of user activity, enrich the data with session id and visit number.
A session will be defined as 30 mins of inactivity and maximum 2 hours.

Expected Output:
Output should have 3 columns :
timestamp userid and session_id
Values of session_id as per logic should be u1_s2, u1_s1, u2_s1, etc
— solve using pyspark —

from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import unix_timestamp, col, lag, when, lit, sum as Fsum, concat, from_unixtime
>>> from pyspark.sql.window import Window
>>> data = [
... ("2018-01-01T11:00:00Z", "u1"),
... ("2018-01-01T12:00:00Z", "u1"),
... ("2018-01-01T11:00:00Z", "u2"),
... ("2018-01-02T11:00:00Z", "u2"),
... ("2018-01-01T12:15:00Z", "u1")
... ]
>>> df = spark.createDataFrame(data, ["timestamp", "userid"])
>>> df = df.withColumn("timestamp", unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
>>> window_spec = Window.partitionBy("userid").orderBy("timestamp")
>>> df = df.withColumn("prev_timestamp", lag("timestamp").over(window_spec))
>>> df = df.withColumn("time_diff", col("timestamp") - col("prev_timestamp"))
>>> df = df.withColumn("session_flag", when((col("time_diff") > 1800) | (col("time_diff").isNull()), 1).otherwise(0))
>>> df = df.withColumn("session_id", col("session_flag"))
>>> window_spec2 = Window.partitionBy("userid").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> df = df.withColumn("session_id", Fsum(col("session_id")).over(window_spec2))
>>> df = df.withColumn("session_id", concat(col("userid"), lit("_s"), col("session_id")))
>>> df = df.withColumn("timestamp", from_unixtime(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
>>> result = df.select("timestamp", "userid", "session_id")
>>> result.show(truncate=False)

Spark-scala based solution:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val spark = SparkSession.builder.appName("SessionizeClicks").getOrCreate()
import spark.implicits._

val data = Seq(
("2018-01-01T11:00:00Z", "u1"),
("2018-01-01T12:00:00Z", "u1"),
("2018-01-01T11:00:00Z", "u2"),
("2018-01-02T11:00:00Z", "u2"),
("2018-01-01T12:15:00Z", "u1")
).toDF("timestamp", "userid")

val df = data.withColumn("timestamp", unix_timestamp($"timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
val windowSpec = Window.partitionBy("userid").orderBy("timestamp")
val dfWithPrev = df.withColumn("prev_timestamp", lag("timestamp", 1).over(windowSpec))
val dfWithTimeDiff = dfWithPrev.withColumn("time_diff", $"timestamp" - $"prev_timestamp")
val dfWithSessionFlag = dfWithTimeDiff.withColumn("session_flag", when($"time_diff" > 1800 || $"time_diff".isNull, 1).otherwise(0))
val windowSpec2 = Window.partitionBy("userid").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
val dfWithSessionId = dfWithSessionFlag.withColumn("session_id", sum($"session_flag").over(windowSpec2))
val dfWithSessionIdString = dfWithSessionId.withColumn("session_id", concat($"userid", lit("_s"), $"session_id"))
val dfWithOriginalTimestamp = dfWithSessionIdString.withColumn("timestamp", from_unixtime($"timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
val result = dfWithOriginalTimestamp.select("timestamp", "userid", "session_id")
result.show(truncate = false)

5. Explain udf in spark with Example:

User-Defined Functions (UDFs) in Apache Spark allow you to define custom functions to apply transformations on DataFrame columns. UDFs can be used to perform operations that are not available in built-in Spark SQL functions. Here’s a detailed explanation along with an example.

To create a UDF in Spark, follow these steps:

  1. Define a regular function
  2. Register the function as a UDF.
  3. Apply the UDF to DataFrame columns.

Let’s create a simple example where we have a DataFrame containing names, and we want to create a UDF to capitalize the first letter of each name.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

def capitalizeFirstLetter(name: String): String = {
if (name == null || name.isEmpty) {
name
} else {
name.substring(0, 1).toUpperCase + name.substring(1).toLowerCase
}
}

val capitalizeUDF = udf(capitalizeFirstLetter _)
val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._

val df = Seq(
("rahul"),
("umesh"),
("vijay"),
("pawan")
).toDF("name")

val dfWithCapitalizedNames = df.withColumn("capitalized_name", capitalizeUDF($"name"))
dfWithCapitalizedNames.show(truncate = false)

Use Udf In Spark sql:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

def capitalizeFirstLetter(name: String): String = {
if (name == null || name.isEmpty) {
name
} else {
name.substring(0, 1).toUpperCase + name.substring(1).toLowerCase
}
}

val spark = SparkSession.builder.appName("UDF Example").getOrCreate()

import spark.implicits._
spark.udf.register("capitalizeFirstLetter", capitalizeFirstLetter(_: String): String)
val df = Seq(
("rahul"),
("umesh"),
("vijay"),
("pawan")
).toDF("name")

df.createOrReplaceTempView("names")

val result = spark.sql("SELECT name, capitalizeFirstLetter(name) AS capitalized_name FROM names")

result.show(truncate = false)

7. Given a dataset of students’ scores in different subjects, transform the data to have each student’s scores for all subjects in a single row. The input data consists of tuples with student name, subject, and score. The desired output is a DataFrame where each row represents a student and their scores in each subject.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Student Scores").getOrCreate()
import spark.implicits._

val data = Seq(
("Alice", "Math", 85),
("Alice", "Science", 90),
("Bob", "Math", 78),
("Bob", "Science", 82),
("Charlie", "Math", 92),
("Charlie", "Science", 87)
)

val df = data.toDF("Name", "Subject", "Score")

val pivotDF = df.groupBy("Name").pivot("Subject").agg(first("Score"))

pivotDF.show(truncate = false)

Leave a Reply

Your email address will not be published. Required fields are marked *