June 3, 2025
Spark Dataframes For Beginner. Introduction to Spark Dataframe: | by Rahul Patidar

Spark Dataframes For Beginner. Introduction to Spark Dataframe: | by Rahul Patidar

Introduction to Spark Dataframe:

In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

How To Create Spark DataFrames:

There are 3 ways to create Spark Dataframes.

1. Create a collection and parse it as a DataFrame.

2. Convert an RDD to a DataFrame using the toDF() method.

3. Import a file into a SparkSession as a DataFrame directly.

Create Dataframe from Collections:

val Products = Seq( 
(1, "shirt",2,800),
(2, "Jeans",2,300),
(3, "Watch",4,200),
(4, "Toys",8,150),(5, "cooldrinks",10,540))
.toDF("Product_ID", "Product_Name","Tax","Cost")
Products.show()
val Products = Seq(
(1, "shirt",2,800),
(2, "Jeans",2,300),
(3, "Watch",4,200),
(4, "Toys",8,150),
(5, "cooldrinks",10,540)
)
val Products_Df=spark.createDataFrame(Products)
Products_Df.show()

Spark Create DataFrame from RDD

val rdd = spark.sparkContext.parallelize(Products)rdd.toDF(“Product_ID”,”Product_Name”,”Tax”,”Cost”).show()
val columns = Seq("Product_ID","Product_Name","Tax","Cost")val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)dfFromRDD2.show()
 import org.apache.spark.sql.types.{StringType,IntegerType, StructField, StructType}import org.apache.spark.sql.Rowval schema = StructType( Array(
StructField("Product_ID", IntegerType,true),
StructField("Product_Name", StringType,true),
StructField("Tax", IntegerType,true),
StructField("Cost", IntegerType,true))
)
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2,attributes._3,attributes._4))val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)dfFromRDD3.show

Using createDataFrame() from SparkSession

val data = Seq((1, "shirt",2,800),
(2, "Jeans",2,300),
(3, "Watch",4,200),
(4, "Toys",8,150),
(5, "cooldrinks",10,540))
val columns = Seq("productid","productname","tax","cose")var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)dfFromData2.show()

Using createDataFrame() with the Row type

import scala.collection.JavaConversions._
import org.apache.spark.sql.types.{StringType, StructField, StructType, IntegerType}
import org.apache.spark.sql.Row
val rowData= Seq(Row(1,"shirt",2,800),
Row(2,"Jeans", 2,300),
Row(3,"Watch", 4,400),
Row(4,"Toys", 8,200),
Row(5,"cooldrinks", 5,540)
)
val schema = StructType( Array(
StructField("productid", IntegerType,true),
StructField("productname", StringType,true),
StructField("tax", IntegerType,true),
StructField("cost", IntegerType,true)
))
var dfFromData = spark.createDataFrame(rowData,schema)dfFromData.show()

Create Spark DataFrame from CSV (If File don’t have Schema).

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};val dataschema = new StructType().add(StructField("id", IntegerType, true)).add(StructField("ProductName", StringType, true)).add(StructField("Tax", IntegerType, true)).add(StructField("Cost",IntegerType, true))val df = spark.read.format("csv").option("delimiter", ",").schema(dataschema).load("File1.csv")df.show()

Create Spark DataFrame from CSV (If File have Schema).

val df = spark.read.format("csv").option("delimiter", ",").option("header","true").load("File2.csv")df.show()

Creating DF from Hive:

val spark= SparkSession.builder().appName("spark").master("local").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()spark.sql("select * from data").show()

Creating Spark DataFrame from RDBMS Database.

val df_from_mysql = spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://localhost:port/db”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()

Creating Spark DataFrame From DB2 table.

val dffromdb2 = spark.read.format(“jdbc”)
.option(“url”, “jdbc:db2://localhost:50000/dbname”)
.option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()

Creating DataFrame from HBase table:

val dffromhbase = sparkSession.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()

Spark Streaming Write to Kafka Topic:

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("subscribe", "TopicName")
.option("startingOffsets", "earliest")
.load()

val df2 = df.selectExpr("CAST(value AS STRING)")

val schema = new StructType()
.add("id",IntegerType)
.add("name",StringType)
.add("City",StringType)
.add("Country",StringType)
.add("Salary",IntegerType)
val DF3 = df2.select(from_json(col("value"), schema).as("data"))
.select("data.*")
DF3.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", "192.168.1.100:9092")
.option("topic", "testDest")
.start()
.awaitTermination()

Note: I have added Spark Streaming Article here, You can learn more about spark dataframe with streaming Source here. Spark Structure Streaming Dataframes.

Adding and renaming a Column in SparkDataframe.

df.withColumn("Country",lit("India")).show()df.withColumnRenamed("cost","price").show()

Drop a column:

df.drop("cost").show()

Case Statement In Dataframe:

val df2 = df.withColumn("category", when(col("productname") === "shirt","Cloth")
.when(col("productname") === "Jeans","Cloth")
.when(col("productname") === "Toys","Toys")
.when(col("productname") === "cooldrinks","Drink")
.otherwise("Unknown")).show()

Filter Condition In Dataframe:

val df2 = df.withColumn("category", when(col("productname") === "shirt","Cloth")
.when(col("productname") === "Jeans","Cloth")
.when(col("productname") === "Toys","Toys")
.when(col("productname") === "cooldrinks","Drink")
.otherwise("Unknown")).filter(df("productname") === "shirt").show()

Joins In Dataframe:

val df = spark.read.format("csv").option("delimiter", ",").option("header","true").load("product.csv")df.show()val df2 = spark.read.format("csv").option("delimiter", ",").option("header","true").load("category.csv")df2.showval df3=df.join(df2,df("productid") ===  df2("id"),"inner")df3.show()

Complete Example Using Spark dataframe:

Questions:

1. Create Two DataFrames (Product and Customers).

2. Join These Two DataFrames based on Product and Product_ID.

3. Filter Customers where City is Chennai.

4. Create new column as TotalPrice (tax*cost*qty) and cast into Integer.

5. create a new column “TypeOfLiving” as assign value as lower,middle and upper class based on TotalPrice(Expenses).

6. select top 2 customers based on expenses

val Products = Seq(
(1, "hairoil",2,800),
(2, "shampoo",2,300),
(3, "soap",4,200),
(4, "biscuit",8,150),
(5, "cooldrinks",10,540)
).toDF("Product_ID", "Product_Name","Tax","Cost")
val Customers = Seq(("bharat", "tuticorn",2,2),
("Raj", "Chennai",3,3),
("ajay", "Bangalore",4,2),
("aswin", "Chennai",5,6),
("Gautham", "mumbai",1,7),
("bharat", "tuticorn",4,3),
("Raj", "Chennai",5,1),
("ajay", "Bangalore",2,1),
("aswin", "Chennai",3,6),
("Gautham", "mumbai",3,1)).toDF("Customer_Name","City","Product","Qty")
Products.show()
Customers.show()
val custInChennai =Customers.join(Products, $"Product" === $"Product_ID", "outer").select("Customer_Name", "City", "Product_ID","Qty","Product_Name","Tax","Cost").filter($"City"==="Chennai").withColumn("TotalPrice",($"Tax".cast("Int")*$"Qty".cast("Int")*$"Cost".cast("Int"))).show()val customerClassification =Customers.join(Products, $"Product" === $"Product_ID", "outer").select("Customer_Name", "City", "Product_ID","Qty","Product_Name","Tax","Cost").withColumn("TotalPrice",($"Tax".cast("Int")*$"Qty".cast("Int")*$"Cost".cast("Int"))).groupBy("Customer_Name").agg(Map(
"TotalPrice" -> "sum")).withColumnRenamed("sum(TotalPrice)","TotalPrice")
customerClassification.show()import org.apache.spark.sql.Columnval customerClassified = customerClassification.withColumn("TypeOfLiving",(when($"TotalPrice".cast("Int") <=1000,"Lower")
.when($"TotalPrice".cast("Int") >1000,"Middle").when($"TotalPrice".cast("Int") <=2000,"Upper Middle").otherwise("UpperClass")))
customerClassified.show()val customerClassification =Customers.join(Products, $"Product" === $"Product_ID", "outer").select("Customer_Name", "City", "Product_ID","Qty","Product_Name","Tax","Cost").withColumn("TotalPrice",($"Tax".cast("Int")*$"Qty".cast("Int")*$"Cost".cast("Int"))).groupBy("Customer_Name").agg(Map(
"TotalPrice" -> "sum")).withColumnRenamed("sum(TotalPrice)","TotalPrice").sort($"TotalPrice".desc).show(2)

Conclusion:

In This Article we have discussed below Points:

1. Different Ways to Create Dataframes.

2. Dataframes from Different Types Sources.

3. Dataframe Joins.

4. Different Operations on Spark Dataframes.

5. End To End Example On Spark Dataframes.

In Next Article We Will Discuss More On Spark DataFrames.Hope You Liked The Articles.

LinkedIn

Important Links For Data Engineers:

1. EveryThing You Need To Need About Scala.

2. Top 20 Scala Programming Question For Interview.

3. Spark Dataframes For Beginner.

4. A Complete Guide On Spark Structure Streaming.

5. A Complete Guide On Apache Hive Basics.

6. A Complete Guide On Apache Hive Advance.

7. Hadoop Components Installation Guide In MacOs

8. Slowly Changing Dimension In Hive — DataWareHouse.

9. A Comprehensive Guide On Apache Sqoop

10. Linux Commands You’ll Actually Need for Your Data-Engineering Journey.

11. Apache Spark Checkpoints: Let’s Recover Lost Data From Offset Values.

Happy Learning!!

Note: To discover the best big-data blogs, visit Feedspot. This is the best Big Data blog list curated from thousands of blogs on the web and ranked by traffic, social media followers, domain authority, and freshness.

Leave a Reply

Your email address will not be published. Required fields are marked *