
What is data frame?
Data Frame is one of higher-level APIs.
Apache Spark -
1. Spark core APIs(RDDs)
2. Higher Level APIs(Data frames and Spark SQL)
3. GraphX
4. MLlib
Data Frames are not persistent.
1. Create Data frame using Python List
# Data List
customer_data = [
("Ganesh",30,"Data Engineering"),
("Akshay",27,"Data Engineering"),
("Snehal",35,"Scrum Master"),
("Rahul",43,"Cricketer"),
("Rohit",32,"Cricketer"),
("Priyesh",31,"IT Manager")
]
df = spark.createDataFrame(data,schema)
df = spark.createDataFrame(data).toDF("col1","col2","col3")
2 ways to define the schema -
1. Schema DDL
2. StructType -
Schema DDL - "col1 col1_datatype,col2 col2_datatype"
StructType
struct_schema = ([
StructField("col1",col1_datatype()),
StructField("col2",col2_datatype())
])
# StructType
from pyspark.sql.types import *
struct_schema = StructType([
StructField("cust_name",StringType()),
StructField("cust_age",IntegerType()),
StructField("cust_prof",StringType())
])
/FileStore/emp_data.csv
1. Infer the Schema
emp_df = spark.read\
.format("csv")\
.option("inferSchema",True)\
.option("header",True)\
.load("/FileStore/emp_data.csv")
2. Infer the Schema - read sample of data
emp_df1 = spark.read\
.format("csv")\
.option("inferSchema",True)\
.option("samplingRatio",0.1)\
.option("header",True)\
.load("/FileStore/emp_data.csv")
3. Enforce the schema - StructType
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_salary",LongType())
])
emp_df3 = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.load("/FileStore/emp_data.csv")
1. When we have python list
df = spark.createDataFrame(data,schema)
2. When file is stored at storage location
df = spark.read\
.format("file_format")\
.schema(dataframe_schema)\
.option("header",True)\
.load("file_path")
3. Creating the dataframe from table
df3 = spark.read.table("employee")
4. Creating the dataframe from table
df4 = spark.table("employee")
5. Creating the dataframe from table
df5 = spark.sql("SELECT * FROM employee")
6. Using range function
df6 = spark.range(6)
df7 = spark.range(12,35)
df8 = spark.range(2,56,4)
Actions and Tranformations in PySpark -
Transformations are Lazy.
Actions are not Lazy.
Transformations in PySpark
Filter
withColumn
withColumnRenamed
select
selectExpr
Actions in PySpark
display
show
count
take
write
Higher Level APIs are interconvertible -
Data Frame to Spark Table -
createOrReplaceTempView - It will create spark table which can be
accessed within one spark session. If we try to create the same view
again, it will not throw and error, it will replace the existing table.
createTempView - It will create spark table which can be
accessed within one spark session. If we try to create the same view
again, it will throw and error.
createOrReplaceGlobalTempView - It will create spark table which can be
accessed across multiple spark sessions. If we try to create the same
view again, it will not throw and error, it will replace the existing
table.
createGlobalTempView - It will create spark table which can be
accessed across multiple spark sessions. If we try to create the same
view again, it will throw and error.
How to convert Spark table to data frame -
emp_df = spark.read.table("employee1")
emp_df = spark.table("employee1")
emp_df2 = spark.sql("SELECT * FROM employee1")
/FileStore/emp_data-1.csv
1. Permissive - If it encounteres data type mismatch,
it will mark that cell as NULL. It is default read mode.
2. Failfast - If it encounteres data type mismatch,
it will throw an error.
3. DropMalformed - If it encounteres data type mismatch,
it will drop that row.
emp_permissive_df = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.option("mode","permissive")\
.load("/FileStore/emp_data-1.csv")
emp_permissive1_df = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.load("/FileStore/emp_data-1.csv")
emp_failfast_df = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.option("mode","failfast")\
.load("/FileStore/emp_data-1.csv")
emp_dropmalformed_df = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.option("mode","dropmalformed")\
.load("/FileStore/emp_data-1.csv")
from pyspark.sql.types import *
from pyspark.sql import functions as F
bikes_schema = StructType([
StructField("model",StringType()),
StructField("mpg",DoubleType()),
StructField("cyl",DoubleType()),
StructField("disp",DoubleType()),
StructField("hp",DoubleType()),
StructField("drat",DoubleType()),
StructField("wt",DoubleType()),
StructField("qsec",DoubleType()),
StructField("vs",DoubleType()),
StructField("am",DoubleType()),
StructField("gear",DoubleType()),
StructField("carb",DoubleType())
])
bikes_raw_df = spark.read.format("json").schema(bikes_schema).load("/Volumes/demo/default/landing/singleline_json.json")
bikes_raw_df.display()
bikes_schema1 = StructType([
StructField("model",StringType()),
StructField("mpg",DoubleType()),
StructField("cyl",DoubleType()),
StructField("disp",DoubleType()),
StructField("horse_power",DoubleType()),
StructField("drat",DoubleType()),
StructField("wt",DoubleType()),
StructField("qsec",DoubleType()),
StructField("vs",DoubleType()),
StructField("am",DoubleType()),
StructField("gear",DoubleType()),
StructField("carb",DoubleType())
])
bikes_raw_df1 = spark.read.format("json").schema(bikes_schema1).load("/Volumes/demo/default/landing/singleline_json.json")
bikes_raw_df1.display()
bikes_raw_df2 = spark.read.json(path="/Volumes/demo/default/landing/singleline_json.json",schema=bikes_schema)
bikes_raw_df2.display()
from pyspark.sql.types import *
from pyspark.sql import functions as F
bikes_schema = StructType([
StructField("model",StringType()),
StructField("mpg",DoubleType()),
StructField("cyl",DoubleType()),
StructField("disp",DoubleType()),
StructField("hp",DoubleType()),
StructField("drat",DoubleType()),
StructField("wt",DoubleType()),
StructField("qsec",DoubleType()),
StructField("vs",DoubleType()),
StructField("am",DoubleType()),
StructField("gear",DoubleType()),
StructField("carb",DoubleType())
])
muliline_json_df = spark.read.format("json").option("multiline",True).schema(bikes_schema).load("/Volumes/demo/default/landing/multiline_json.json")
muliline_json_df.display()
multiline_json_df1 = spark.read.json(path="/Volumes/demo/default/landing/multiline_json.json",schema=bikes_schema,multiLine=True)
multiline_json_df1.display()
Parquet is column based file format.
200 columns and you want to read 30 columns.
parquet_df = spark.read.format("parquet").load("/Volumes/demo/default/landing/titanic.parquet")
parquet_df1 = spark.read.parquet("/Volumes/demo/default/landing/titanic.parquet")
/FileStore/emp_data.csv
1. withColumn Transformation is used to update existing column.
2. withColumn Transformation is used to create a new column.
emp_id emp_name salary
1 emp1 1000
2 emp2 2000
3 emp3 3000
4 emp4 4000
1. Increase the salary of all employees by 6%. Keep the new salary in
the same column as present in the data.
from pyspark.sql import functions as F
emp_df1 = emp_df.withColumn("emp_salary",F.expr("emp_salary*1.06"))
emp_df2 = emp_df.withColumn("emp_salary",F.expr("emp_salary + (emp_salary*0.06)"))
2. Increase the salary of all employees by 6%. Keep the new salary in
the separate column (new_salary).
emp_df3 = emp_df.withColumn("new_salary",F.expr("emp_salary*1.06"))
emp_df4 = emp_df.withColumn("new_salary",F.expr("emp_salary + emp_salary*0.06"))
1. Convert emp_id from string to integer
from pyspark.sql import functions as F
emp_df1 = emp_df.withColumn("emp_id",F.col("emp_id").cast("integer"))
emp_df2 = emp_df.withColumn("emp_id",emp_df["emp_id"].cast("integer"))
2. Convert created_timestamp column from string to timestamp
emp_df3 = emp_df.withColumn("created_timestamp",F.col("created_timestamp").cast("timestamp"))
emp_df4 = emp_df.withColumn("created_timestamp",emp_df["created_timestamp"].cast("timestamp"))
emp_df5 = emp_df.withColumn("created_timestamp",F.to_timestamp("created_timestamp"))
3. Create the created_date column with date format as yyyy-MM-dd
emp_df7 = emp_df.withColumn("created_timestamp",F.to_timestamp("created_timestamp"))\
.withColumn("created_date",F.to_date("created_timestamp","yyyy-MM-dd"))
dataframe and spark SQL are higher level APIs.
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_id,emp_name,emp_salary*1.06 AS emp_salary FROM employee").display()
spark.sql("SELECT emp_id,emp_name,emp_salary,emp_salary*1.06 AS new_salary FROM employee").display()
emp_datatypes_df.createOrReplaceTempView("employee_datatypes")
spark.sql("SELECT cast(emp_id AS integer),emp_name,emp_salary,created_timestamp FROM employee_datatypes").printSchema()
spark.sql("SELECT cast(emp_id AS integer),emp_name,emp_salary,cast(created_timestamp AS timestamp) FROM employee_datatypes").printSchema()
spark.sql("SELECT cast(emp_id AS INTEGER),emp_name,emp_salary,to_timestamp(created_timestamp) AS created_timestamp FROM employee_datatypes").printSchema()
spark.sql("SELECT cast(emp_id AS INTEGER),emp_name,emp_salary,to_timestamp(created_timestamp) AS created_timestamp,to_date(created_timestamp,'yyyy-MM-dd') AS created_date FROM employee_datatypes").printSchema()
spark.sql("SELECT cast(emp_id AS INTEGER),emp_name,emp_salary,to_timestamp(created_timestamp) AS created_timestamp,to_date(created_timestamp,'yyyy-MM-dd') AS created_date FROM employee_datatypes").display()
spark.sql("""WITH T1 AS (SELECT cast(emp_id AS INTEGER),emp_name,emp_salary,to_timestamp(created_timestamp) AS created_timestamp FROM employee_datatypes)
SELECT emp_id,emp_name,emp_salary,created_timestamp,to_date(created_timestamp,'yyyy-MM-dd') AS created_date FROM T1""").display()
/FileStore/emp_data.csv
1. Select transformation in PySpark takes column name as input.
from pyspark.sql.types import *
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_salary",LongType())
])
emp_df = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.load("/FileStore/emp_data.csv")
from pyspark.sql import functions as F
emp_df1 = emp_df.select(F.col("emp_id"),F.col("emp_salary"))
emp_df2 = emp_df.withColumn("emp_salary",F.expr("emp_salary*1.06"))
emp_df3 = emp_df.select(F.col("emp_id"),F.col("emp_name"),F.expr("emp_salary*1.06 AS emp_salary"))
emp_df4 = emp_df.select(F.col("emp_id"),F.col("emp_name"),F.expr("emp_salary*1.06").alias("emp_salary"))
emp_df5 = emp_df.withColumn("new_salary",F.expr("emp_salary*1.06"))
emp_df6 = emp_df.select(F.col("emp_id"),F.col("emp_name"),F.col("emp_salary"),F.expr("emp_salary*1.06").alias("new_salary"))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_id,emp_name,emp_salary*1.06 AS emp_salary FROM employee").display()
spark.sql("SELECT emp_id,emp_name,emp_salary,emp_salary*1.06 AS new_salary FROM employee").display()
/FileStore/emp_data.csv
1. selectExpr transformation takes column name and expression as input.
selectExpr transformation accepts SQL-style expressions as strings.
withColumn and select transformations allow using PySpark expressions
like F.col and is more of Python approach.
select transformation-
1. Select transformation is used to select columns.
df.select(F.col("col1"),F.col("col2"))
2. Select transformation is used to update existing column by applying
expressions.
df.select(F.col("col1"),F.expr("col2*3").alias("col2"))
3. Select transformation is used to create new column by applying
expressions.
df.select(F.col("col1"),F.expr("col2*3").alias("col3"))
4. Select transformation is effcient beacuse it works directly on
column objects and expressions. No extra parsing is needed.
withColumn transformation -
1. Used to update existing column/columns by applying
expressions.
df.withColumn("col1",F.expr("col1*2"))
2. Used to create new column/columns by applying
expressions.
df.withColumn("col1",F.expr("col1*2"))
3. When we need to work with multiple column transformations,
it takes little more time than select transformation as multiple
intermediate dataframes will be created.
df.withColumn("col1",F.expr("col1*2"))\
.withColumn("col2",F.expr(("col2*2"))
df.select(F.expr("col1*2").alias("col1"),F.expr("col2*2").alias("col2"))
selectExpr transformation -
1. It is also used to update existing column/columns by using
SQL expressions.
df.selectExpr("col1*2 AS col1","col2*2 AS col2")
2. It is also used to create new column/columns column by using
SQL expressions.
df.selectExpr("col1*2 AS col1","col2*2 AS col2")
3. The SQL expression needs to be parsed and converted and
column objects.
/FileStore/emp_data.csv
df.withColumnRenamed("old_column_name","new_column_name")
emp_df2 = emp_df.withColumnRenamed("emp_id","employee_id")\
.withColumnRenamed("emp_name","employee_name")\
.withColumnRenamed("emp_salary","employee_salary")
emp_df3 = emp_df.select(F.col("emp_id").alias("employee_id"),F.col("emp_name").alias("employee_name"),F.col("emp_salary").alias("employee_salary"))
new_column_names = ["employee_id","employee_name","employee_salary"]
emp_df5 = emp_df.toDF(*new_column_names)
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_id AS employee_id,emp_name AS employee_name,emp_salary AS employee_salary FROM employee").display()
emp_df6 = emp_df.selectExpr("emp_id AS employee_id","emp_name AS employee_name","emp_salary AS employee_salary")
emp_df7 = emp_df.withColumn("employee_id",F.col("emp_id"))\
.withColumn("employee_name",F.col("emp_name"))\
.withColumn("employee_salary",F.col("emp_salary"))\
.drop("emp_id","emp_name","emp_salary")
emp_df = emp_df8
column_renamed_dict = {"emp_id":"employee_id","emp_name":"employee_name","emp_salary":"employee_salary"}
for old_col,new_col in column_renamed_dict.items():
emp_df = emp_df.withColumnRenamed(old_col,new_col)
emp_df7 = emp_df.withColumn("employee_id",F.col("emp_id"))\
.withColumn("employee_name",F.col("emp_name"))\
.withColumn("employee_salary",F.col("emp_salary"))\
.drop("emp_id","emp_name","emp_salary")
emp_df = emp_df8
column_renamed_dict = {"emp_id":"employee_id","emp_name":"employee_name","emp_salary":"employee_salary"}
for old_col,new_col in column_renamed_dict.items():
emp_df8 = emp_df8.withColumnRenamed(old_col,new_col)
1. Filter the employees whose salary is greater than 24000 - >
2. Filter the employees whose name ends with 1 - LIKE, endswith
3. Filter the employees whose emp_id is between 34 to 67 - between
4. Filter the employees whose name starts with 1 - LIKE, startswith
emp_df1 = emp_df.filter("emp_salary > 24000")
emp_df2 = emp_df.filter(F.col("emp_salary")>F.lit(24000))
emp_df3 = emp_df.where(F.col("emp_salary")>F.lit(24000))
emp_df4 = emp_df.filter("emp_name LIKE '%1'")
emp_df5 = emp_df.filter('emp_name LIKE "%1"')
emp_df6 = emp_df.filter(F.col("emp_name").like("%1"))
emp_df7 = emp_df.where(F.col("emp_name").like("%1"))
emp_df8 = emp_df.filter("emp_id BETWEEN 34 AND 67")
emp_df9 = emp_df.filter(F.col("emp_id").between(34,67))
emp_df10 = emp_df.where(F.col("emp_id").between(34,67))
emp_df11 = emp_df.where("emp_id BETWEEN 34 AND 67")
emp_df1 = emp_df.filter(F.col("emp_name").isin(["emp1","emp21","emp32","emp43"]))
employees_to_award = ["emp1","emp21","emp32","emp43"]
emp_df2 = emp_df.filter(F.col("emp_name").isin(employees_to_award))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT * FROM employee WHERE emp_salary>24000").display()
spark.sql("SELECT * FROM employee WHERE emp_name LIKE '%1'").display()
spark.sql("SELECT * FROM employee WHERE emp_id BETWEEN 34 AND 67").display()
spark.sql("SELECT * FROM employee WHERE emp_name IN ('emp1','emp21','emp32','emp43')").display()
emp_df9 = emp_df.filter(F.col("emp_id").between(34,67) & F.col("emp_name").isin(employees_to_award))
Default sorting order is Ascending Order.
from pyspark.sql.types import *
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_salary",LongType())
])
emp_df = spark.read\
.format("csv")\
.schema(emp_schema)\
.option("header",True)\
.load("/FileStore/emp_data.csv")
from pyspark.sql import functions as F
emp_df1 = emp_df.orderBy(F.col("emp_id"))
emp_df2 = emp_df.orderBy(F.col("emp_id"),ascending=True)
emp_df3 = emp_df.orderBy(F.col("emp_id").asc())
emp_df4 = emp_df.orderBy(F.asc(F.col("emp_id")))
emp_df5 = emp_df.orderBy(F.col("emp_id"),ascending=False)
emp_df6 = emp_df.orderBy(F.col("emp_id").desc())
emp_df7 = emp_df.orderBy(F.desc(F.col("emp_id")))
emp_df8 = emp_df.sort(F.col("emp_salary"))
emp_df9 = emp_df.sort(F.col("emp_salary"),ascending=True)
emp_df10 = emp_df.sort(F.col("emp_salary").asc())
emp_df11 = emp_df.sort(F.asc(F.col("emp_salary")))
emp_df12 = emp_df.sort(F.col("emp_salary"),ascending=False)
emp_df13 = emp_df.sort(F.col("emp_salary").desc())
emp_df14 = emp_df.sort(F.desc(F.col("emp_salary")))
emp_id emp_salary
1 100
2 100
3 200
4 200
5 300
6 300
Sort on emp_salary in descending order and emp_id in ascending order
emp_id emp_salary
5 300
6 300
3 200
4 200
1 100
2 100
Sort on emp_salary in descending order and emp_id in descending order
emp_id emp_salary
6 300
5 300
4 200
3 200
2 100
1 100
emp_id emp_name emp_salary
1 Ganesh 1000
2 Rakesh 1000
3 Ganesh 1000
4 Akshay 2000
5 Akshay 2000
6 Dhiraj 2000
Sort based on emp_salary DESC, emp_name DESC, emp_id DESC
emp_id emp_name emp_salary
6 Dhiraj 2000
5 Akshay 2000
4 Akshay 2000
2 Rakesh 1000
3 Ganesh 1000
1 Ganesh 1000
df = emp_df.orderBy(F.col("emp_salary"),F.col("emp_name"),F.col("emp_id"),ascending = [False,False,False])
270 MB File/Data
128 MB - Partitions
3 Data Partitions
P1
5
3
2
1
4
P2
10
9
8
7
6
P3
11
12
13
14
15
sort - Sort only performs local sorting.
P1
5
4
3
2
1
P2
10
9
8
7
6
P3
15
14
13
12
11
Driver Results from Sort
10
9
8
7
6
5
4
3
2
1
15
14
13
12
11
Sorting is preferred when our data has only one partition.
OrderBy - In orderBy, there happens local sorting first and then
global sorting.
P1
5
4
3
2
1
P2
10
9
8
7
6
P3
15
14
13
12
11
Results received to driver -
10
9
8
7
6
5
4
3
2
1
15
14
13
12
11
Final results at driver after global sorting -
15
14
13
12
11
10
9
8
7
6
5
4
3
2
1
Three types of Aggregations -
1. Simple Aggregation
2. Grouping Aggregation
3. Windowing Aggregation
Simple Aggregations in PySpark - SUM
1. How much an organisation is spending on employee salaries.
emp_df1 = emp_df.select(expr("sum(emp_salary) AS total_salary"))
emp_df2 = emp_df.select(F.expr("sum(emp_salary) AS total_salary"))
emp_df3 = emp_df.select(F.sum(F.col("emp_salary")).alias("total_salary"))
emp_df4 = emp_df.selectExpr("sum(emp_salary) AS total_salary")
emp_df5 = emp_df.agg(F.sum(F.col("emp_salary")).alias("total_salary"))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT SUM(emp_salary) AS total_salary FROM employee").display()
1. Find count of total employees in the organization.
2. Find the Average Salary of the employees in the organization.
3. Find the maximum salary of the employee in the organization.
4. Find the minimum salary of the employee in the organization.
emp_df1 = emp_df.agg(F.count(F.col("emp_id")).alias("total_employees"))
emp_df2 = emp_df.count()
emp_df3 = emp_df.agg(F.avg(F.col("emp_salary")).alias("average_salary"))
emp_df4 = emp_df.agg(F.max(F.col("emp_salary")).alias("maximum_salary"))
emp_df5 = emp_df.agg(F.min(F.col("emp_salary")).alias("minimum_salary"))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT COUNT(emp_id) AS total_employees FROM employee").display()
spark.sql("SELECT AVG(emp_salary) AS average_salary FROM employee").display()
spark.sql("SELECT MAX(emp_salary) AS maximum_salary FROM employee").display()
spark.sql("SELECT MIN(emp_salary) AS minimum_salary FROM employee").display()
emp_df6 = emp_df.agg(F.sum(F.col("emp_salary")).alias("total_salary"),F.count(F.col("emp_id")).alias("total_employees"),F.avg(F.col("emp_salary")).alias("average_salary"),F.max(F.col("emp_salary")).alias("maximum_salary"),F.min(F.col("emp_salary")).alias("minimum_salary"))
1. Total employees in the organisation department wise.
emp_df1 = emp_df\
.agg(F.count(F.col("emp_id")).alias("total_employees"))
emp_df2 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))
emp_df3 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))\
.orderBy(F.col("emp_department"),ascending=True)
emp_df4 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))\
.orderBy(F.asc(F.col("emp_department")))
emp_df5 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))\
.orderBy(F.col("emp_department").asc())
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_department,COUNT(emp_id) AS total_employee FROM employee GROUP BY emp_department").display()
spark.sql("SELECT emp_department,COUNT(emp_id) AS total_employee FROM employee GROUP BY emp_department ORDER BY emp_department").display()
spark.sql("SELECT emp_department,COUNT(emp_id) AS total_employee FROM employee GROUP BY emp_department ORDER BY emp_department ASC").display()
spark.sql("SELECT emp_department,COUNT(emp_id) AS total_employee FROM employee GROUP BY emp_department ORDER BY emp_department DESC").display()
emp_df6 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))\
.orderBy(F.col("total_employees"),ascending=False)
emp_df7 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))\
.orderBy(F.desc(F.col("total_employees")))
emp_df8 = emp_df\
.groupBy("emp_department")\
.agg(F.count(F.col("emp_id")).alias("total_employees"))\
.orderBy(F.col("total_employees").desc())
1. Average salary of employees per department.
2. Total amount every department is spending on employee salary.
3. Maximum employee salary of each department.
4. Minimum employee salary of each department.
emp_df1 = emp_df\
.agg(F.avg(F.col("emp_salary")).alias("average_salary"))
emp_df2 = emp_df\
.groupBy("emp_department")\
.agg(F.avg(F.col("emp_salary")).alias("average_salary"))
emp_df3 = emp_df\
.groupBy("emp_department")\
.agg(F.avg(F.col("emp_salary")).alias("average_salary"))\
.orderBy(F.col("emp_department"),ascending=True)
emp_df4 = emp_df\
.groupBy(F.col("emp_department"))\
.agg(F.avg(F.col("emp_salary")).alias("employee_salary"))\
.orderBy(F.col("employee_salary"),ascending=False)
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_department,AVG(emp_salary) AS average_salary FROM employee GROUP BY emp_department ORDER BY emp_department ASC").display()
spark.sql("SELECT emp_department,AVG(emp_salary) AS average_salary FROM employee GROUP BY emp_department ORDER BY average_salary DESC").display()
# Total amount every department is spending on employee salary
emp_df5 = emp_df\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"))
emp_df6 = emp_df\
.groupBy("emp_department")\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"))
emp_df7 = emp_df\
.groupBy("emp_department")\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"))\
.orderBy(F.col("emp_department"),ascending=True)
emp_df8 = emp_df\
.groupBy("emp_department")\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"))\
.orderBy(F.col("total_spending"),ascending=False)
spark.sql("SELECT SUM(emp_salary) AS total_spending FROM employee").display()
spark.sql("SELECT emp_department,SUM(emp_salary) AS total_spending FROM employee GROUP BY emp_department").display()
spark.sql("SELECT emp_department,SUM(emp_salary) AS total_spending FROM employee GROUP BY emp_department ORDER BY emp_department ASC").display()
spark.sql("SELECT emp_department,SUM(emp_salary) AS total_spending FROM employee GROUP BY emp_department ORDER BY total_spending DESC").display()
3. Maximum employee salary of each department.
4. Minimum employee salary of each department.
emp_df1 = emp_df\
.agg(F.max(F.col("emp_salary")).alias("maximum_salary"))
emp_df2 = emp_df\
.groupBy("emp_department")\
.agg(F.max(F.col("emp_salary")).alias("maximum_salary"))
emp_df3 = emp_df\
.groupBy("emp_department")\
.agg(F.max(F.col("emp_salary")).alias("maximum_salary"))\
.orderBy(F.asc(F.col("emp_department")))
emp_df4 = emp_df\
.groupBy("emp_department")\
.agg(F.max(F.col("emp_salary")).alias("maximum_salary"))\
.orderBy(F.desc(F.col("maximum_salary")))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT MAX(emp_salary) AS maximum_salary FROM employee").display()
spark.sql("SELECT emp_department,MAX(emp_salary) AS maximum_salary FROM employee GROUP BY emp_department").display()
emp_df5 = emp_df\
.groupBy(F.col("emp_department"))\
.agg(F.min(F.col("emp_salary")).alias("minimum_salary"))\
.orderBy(F.col("emp_department"))
spark.sql("SELECT emp_department,MIN(emp_salary) AS mainimum_salary FROM employee GROUP BY emp_department").display()
emp_id emp_name emp_salary emp_department
1 Ganesh 10000 1
2 Ganesh1 12000 1
3 Ramesh 11000 2
4 Pritesh 13000 2
5 Priyesh 14000 3
Dep 1
Dep 2
Dep 3
emp_id emp_name emp_salary emp_department emp_city
1 Ganesh 10000 1 Pune
2 Ganesh1 12000 1 Pune
3 Ramesh 11000 2 Mumbai
4 Pritesh 13000 2 Chennai
5 Priyesh 14000 3 Pune
grouping based on emp_city and emp_department
Pune and dep 1
Pune and dep 3
Mumbai and dep 2
Chennai and dep 2
1. How much every department is spending on employee salary in each city.
2. Average employee salary based on department in each city.
3. Maximum employee salary based on department in each city.
4. Minimum employee salary based on department in each city.
employee_data = [
("Ivy Jones", "Engineering", "Nevada", 53000, 23, 16500),
("Eve Smith", "Admin", "Nevada", 78000, 43, 17000),
("Jack Johnson", "IT", "California", 101000, 22, 5000),
("Ivy Williams", "Engineering", "California", 96000, 46, 15500),
("David Williams", "Admin", "New York", 115000, 53, 6500),
("David Smith", "IT", "Texas", 71000, 33, 18000),
("Grace Williams", "Finance", "California", 100000, 47, 15000),
("Grace Smith", "Marketing", "Florida", 76000, 26, 17500),
("Hank Jones", "Finance", "Washington", 72000, 39, 5000),
("Jack Jones", "Admin", "New York", 101000, 51, 11000),
("David Johnson", "HR", "Georgia", 60000, 56, 1500),
("Grace Brown", "IT", "Nevada", 92000, 27, 19000),
("Fiona Johnson", "Finance", "Florida", 117000, 30, 19000),
("Fiona Jones", "HR", "New York", 40000, 24, 6500),
("Eve Williams", "Sales", "Georgia", 95000, 31, 10000),
("Jack Brown", "Engineering", "Ohio", 109000, 55, 19000),
("David Williams", "IT", "Texas", 60000, 47, 8000),
("Ivy Brown", "Admin", "Washington", 83000, 29, 6500),
("Bob Johnson", "HR", "Nevada", 88000, 25, 17000),
("Alice Brown", "HR", "New York", 79000, 23, 2000),
("Ivy Williams", "Sales", "California", 62000, 49, 12000),
("Fiona Brown", "Admin", "Illinois", 81000, 46, 11000),
("Ivy Johnson", "HR", "Illinois", 83000, 36, 11500),
("Ivy Williams", "Sales", "Florida", 51000, 51, 7500),
("Bob Jones", "IT", "Ohio", 79000, 45, 4000),
("Fiona Smith", "Engineering", "Ohio", 110000, 39, 1000),
("Grace Williams", "IT", "California", 90000, 58, 4500),
("David Johnson", "IT", "California", 105000, 24, 11000),
("Grace Johnson", "Finance", "Florida", 79000, 24, 1500),
("Grace Johnson", "Admin", "California", 92000, 27, 2000),
("Grace Smith", "HR", "Nevada", 72000, 27, 17000),
("Bob Brown", "Finance", "New York", 70000, 51, 11500),
("Jack Brown", "Engineering", "Washington", 75000, 26, 13000),
("Fiona Johnson", "HR", "California", 53000, 43, 14500),
("Charlie Brown", "Finance", "New York", 56000, 26, 4500),
("Fiona Brown", "HR", "Georgia", 110000, 47, 14000),
("Charlie Williams", "Engineering", "California", 50000, 25, 6500),
("Charlie Smith", "Admin", "California", 48000, 46, 7500),
("Eve Smith", "Marketing", "Florida", 98000, 48, 3500),
("Hank Smith", "IT", "Nevada", 72000, 34, 1500),
("Charlie Smith", "Finance", "Georgia", 107000, 31, 19500),
("Jack Williams", "Admin", "New York", 54000, 29, 11000),
("David Smith", "Admin", "Texas", 41000, 31, 11000),
("Grace Johnson", "Sales", "Georgia", 90000, 57, 3500),
("Fiona Johnson", "Finance", "Washington", 59000, 57, 10500),
("Eve Jones", "Finance", "Texas", 102000, 27, 3000),
("Alice Johnson", "IT", "California", 106000, 44, 3500),
("David Williams", "HR", "Texas", 73000, 22, 18500),
("Charlie Johnson", "Admin", "Nevada", 96000, 32, 10000),
("David Jones", "Engineering", "California", 48000, 33, 16500),
("Grace Johnson", "Marketing", "Georgia", 84000, 46, 3500),
("Jack Smith", "Sales", "New York", 90000, 47, 17500),
("Grace Brown", "Finance", "California", 46000, 38, 11500),
("Eve Smith", "Engineering", "Florida", 105000, 59, 12000),
("David Jones", "Sales", "Ohio", 110000, 40, 17000),
("Hank Brown", "Sales", "Ohio", 112000, 40, 4000),
("Alice Jones", "Sales", "New York", 117000, 46, 7000),
("Charlie Smith", "Admin", "Illinois", 63000, 40, 8000),
("Charlie Johnson", "Admin", "Washington", 58000, 44, 4500),
("Charlie Jones", "Finance", "Florida", 96000, 44, 5000),
("Bob Williams", "Finance", "Ohio", 61000, 48, 7000),
("Alice Jones", "HR", "Florida", 98000, 25, 14500),
("Hank Brown", "Finance", "New York", 103000, 51, 10500),
("Alice Brown", "Marketing", "Texas", 119000, 56, 16000),
("Grace Johnson", "IT", "Illinois", 53000, 36, 8500),
("Alice Jones", "Marketing", "Illinois", 85000, 39, 11500),
("Fiona Williams", "Marketing", "California", 98000, 59, 18500),
("Jack Johnson", "Engineering", "Georgia", 47000, 41, 7000),
("Fiona Williams", "IT", "New York", 96000, 54, 11500),
("Grace Johnson", "HR", "California", 79000, 59, 11500),
("David Brown", "Marketing", "Florida", 86000, 53, 4000),
("David Brown", "Marketing", "Washington", 40000, 36, 3000),
("Alice Smith", "Finance", "California", 56000, 25, 8000),
("Charlie Smith", "IT", "Ohio", 91000, 43, 3500),
("Hank Jones", "Engineering", "Nevada", 94000, 28, 5500),
("Bob Jones", "Admin", "Texas", 91000, 42, 7000),
("Fiona Johnson", "Sales", "Washington", 69000, 57, 19000),
("Hank Williams", "Engineering", "New York", 107000, 28, 16500),
("Eve Brown", "HR", "Nevada", 71000, 26, 5000),
("Alice Jones", "HR", "Texas", 54000, 57, 12000),
("Grace Smith", "Marketing", "New York", 71000, 53, 10500),
("Eve Brown", "Admin", "Georgia", 41000, 27, 18500),
("Jack Williams", "IT", "Texas", 78000, 28, 16500),
("Bob Brown", "Admin", "Washington", 103000, 55, 10500),
("Grace Smith", "HR", "Nevada", 80000, 32, 5000),
("Bob Smith", "Sales", "Georgia", 43000, 26, 15500),
("Fiona Brown", "Admin", "Texas", 62000, 30, 6000),
("Grace Brown", "Finance", "New York", 88000, 42, 16000),
("Fiona Johnson", "HR", "Washington", 57000, 44, 10500),
("Hank Jones", "HR", "Illinois", 71000, 34, 6000),
("Jack Jones", "Finance", "Illinois", 106000, 57, 3500)
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_name",StringType()),
StructField("emp_dep",StringType()),
StructField("emp_city",StringType()),
StructField("emp_salary",LongType()),
StructField("emp_age",IntegerType()),
StructField("emp_bouns",LongType())
])
emp_df = spark.createDataFrame(employee_data,emp_schema)
emp_df.display()
# 1 - How much every department is spending on employee salary in each city
emp_df1 = emp_df\
.groupBy(F.col("emp_dep"),F.col("emp_city"))\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"))
emp_df2 = emp_df\
.groupBy(F.col("emp_dep"),F.col("emp_city"))\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"))\
.orderBy(F.asc(F.col("emp_dep")))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_dep,emp_city,SUM(emp_salary) AS total_spending FROM employee GROUP BY emp_dep,emp_city").display()
spark.sql("SELECT emp_dep,emp_city,SUM(emp_salary) AS total_spending FROM employee GROUP BY emp_dep,emp_city ORDER BY emp_dep ASC").display()
2. Average employee salary based on department in each city.
3. Maximum employee salary based on department in each city.
4. Minimum employee salary based on department in each city.
emp_df1 = emp_df\
.groupBy(F.col("emp_dep"),F.col("emp_city"))\
.agg(F.avg(F.col("emp_salary")).alias("average_salary"))\
.orderBy(F.col("emp_dep").asc())
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_dep,emp_city,AVG(emp_salary) AS average_salary FROM employee GROUP BY ALL ORDER BY emp_dep ASC").display()
emp_df2 = emp_df\
.groupBy(F.col("emp_dep"),F.col("emp_city"))\
.agg(F.max(F.col("emp_salary")).alias("maximum_salary"))\
.orderBy(F.desc(F.col("emp_dep")))
spark.sql("SELECT emp_dep,emp_city,MAX(emp_salary) AS maximum_salary FROM employee GROUP BY ALL ORDER BY emp_dep DESC").display()
emp_df3 = emp_df\
.groupBy(F.col("emp_dep"),F.col("emp_city"))\
.agg(F.min(F.col("emp_salary")).alias("minimum_salary"))\
.orderBy(F.col("emp_dep").desc())
spark.sql("SELECT emp_dep,emp_city,MIN(emp_salary) AS minimum_salary FROM employee GROUP BY ALL ORDER BY emp_dep DESC").display()
1. How much every department is spending on employee salary in each city
2. Average employee salary based on department in each city.
3. Maximum employee salary based on department in each city.
4. Minimum employee salary based on department in each city.
emp_results_df = emp_df\
.groupBy(F.col("emp_dep"),F.col("emp_city"))\
.agg(F.sum(F.col("emp_salary")).alias("total_spending"),F.avg(F.col("emp_salary")).alias("average_salary"),F.max(F.col("emp_salary")).alias("maximum_salary"),F.min(F.col("emp_salary")).alias("minimum_salary"))\
.orderBy(F.asc(F.col("emp_dep")))
emp_df.createOrReplaceTempView("employee")
spark.sql("SELECT emp_dep,emp_city,SUM(emp_salary) AS total_spending,AVG(emp_salary) AS average_salary,MAX(emp_salary) AS maximum_salary,MIN(emp_salary) AS minimum_salary FROM employee GROUP BY ALL ORDER BY emp_dep").display()
marks_data = [
("Ganesh","English",99),
("Akshay","English",99),
("Priyesh","English",98),
("Rohit","English",98),
("Shrikant","English",98),
("Mayur","English",97),
("Ganesh","History",77),
("Akshay","History",77),
("Priyesh","History",98),
("Rohit","History",98),
("Shrikant","History",99),
("Mayur","History",99)
]
1. Find top 3 students who have got highest marks in each subject.
If there is tie, student first in alphabetical order to be returned.
from pyspark.sql.types import *
from pyspark.sql import functions as F
marks_schema = StructType([
StructField("student_name",StringType()),
StructField("subject",StringType()),
StructField("marks",IntegerType())
])
marks_df = spark.createDataFrame(marks_data,marks_schema)
marks_df.display()
from pyspark.sql.window import Window
marks_window = Window.partitionBy("subject")\
.orderBy(F.desc(F.col("marks")),F.asc(F.col("student_name")))
marks_df1 = marks_df.withColumn("marks_number",F.row_number().over(marks_window))
marks_df2 = marks_df.withColumn("marks_number",F.row_number().over(Window.partitionBy("subject")\
.orderBy(F.desc(F.col("marks")),F.asc(F.col("student_name")))))
marks_df2 = marks_df1.filter(F.col("marks_number")<=F.lit(3))
marks_df3 = marks_df\
.withColumn("marks_number",F.row_number().over(marks_window))\
.filter(F.col("marks_number")<=F.lit(3))
marks_df.createOrReplaceTempView("marks")
spark.sql("SELECT student_name,subject,marks,row_number() OVER (PARTITION BY subject ORDER BY marks DESC,student_name ASC) AS marks_number FROM marks").display()
spark.sql("""WITH T1 (student_name,subject,marks,marks_number) AS (SELECT student_name,subject,marks,row_number() OVER (PARTITION BY subject ORDER BY marks DESC,student_name ASC) AS marks_number FROM marks)
SELECT * FROM T1 WHERE marks_number <= 3""").display()
marks row_number rank
100 1 1
100 2 1
100 3 1
99 4 4
99 5 4
98 6 6
97 7 7
97 8 7
96 9 8
95 10 9
95 11 9
from pyspark.sql.window import Window
rank_window = Window.partitionBy(F.col("subject"))\
.orderBy(F.desc(F.col("marks")))
marks_df1 = marks_df.withColumn("rank_number",F.rank().over(rank_window))
marks_df2 = marks_df.withColumn("row_number",F.row_number().over(rank_window))
marks_df.createOrReplaceTempView("marks")
spark.sql("SELECT student_name,subject,marks,rank() OVER (PARTITION BY subject ORDER BY marks DESC) AS rank_number FROM marks").display()
row_number - It assigns the number to rows serially without skipping
any number in between.
rank - It assigns rank to rows but skips the numbers in between.
For tie it will assign same rank but skips the next number
based on number of records already assigned a rank.
dense_rank - It assigns rank to rows and does not skip anything in
between. For tie, it will assign same rank.
from pyspark.sql.window import Window
dense_rank_window = Window.partitionBy(F.col("subject"))\
.orderBy(F.col("marks").desc())
marks_df1 = marks_df.withColumn("dense_rank_number",F.dense_rank().over(dense_rank_window))
marks_df2 = marks_df.withColumn("row_number",F.row_number().over(dense_rank_window))\
.withColumn("rank",F.rank().over(dense_rank_window))\
.withColumn("dense_rank",F.dense_rank().over(dense_rank_window))
marks_df.createOrReplaceTempView("marks")
spark.sql("SELECT student_name,subject,marks,row_number() OVER (PARTITION BY subject ORDER BY marks DESC) AS row_number,rank() OVER (PARTITION BY subject ORDER BY marks DESC) AS rank,dense_rank() OVER (PARTITION BY subject ORDER BY marks DESC) AS dense_rank FROM marks").display()
When row_number, rank and dense_rank gives same results and your
perpose is solved by using any of these, always go with row_number.
Rank and Dense_Rank needs little extra processing as they have
to decide between ties.
duplicate_marks_data = [
("Ganesh","English",99),
("Akshay","English",99),
("Priyesh","English",98),
("Rohit","English",98),
("Shrikant","English",98),
("Mayur","English",97),
("Akshay","English",99),
("Akshay","English",99),
("Ganesh","History",77),
("Akshay","History",77),
("Priyesh","History",98),
("Rohit","History",98),
("Shrikant","History",99),
("Mayur","History",99),
("Shrikant","History",99),
("Shrikant","History",99),
("Shrikant","History",99)
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
marks_schema = StructType([
StructField("student_name",StringType()),
StructField("subject",StringType()),
StructField("marks",IntegerType())
])
duplicate_marks_df = spark.createDataFrame(duplicate_marks_data,marks_schema)
duplicate_marks_df.display()
from pyspark.sql.window import Window
duplicate_removal_window = Window.partitionBy(F.col("student_name"),F.col("subject"))\
.orderBy(F.asc(F.col("student_name")))
non_duplicate_df = duplicate_marks_df\
.withColumn("row_number",F.row_number().over(duplicate_removal_window))\
.filter(F.col("row_number") == F.lit(1))\
.drop(F.col("row_number"))
non_duplicate_df1 = duplicate_marks_df\
.withColumn("rank",F.rank().over(duplicate_removal_window))
non_duplicate_df1.display()
non_duplicate_df2 = duplicate_marks_df\
.withColumn("demse_rank",F.dense_rank().over(duplicate_removal_window))
non_duplicate_df2.display()
marks_data = [
("Ganesh","English",99),
("Akshay","English",99),
("Priyesh","English",98),
("Rohit","English",98),
("Shrikant","English",98),
("Mayur","English",97),
("Ganesh","History",77),
("Akshay","History",77),
("Priyesh","History",98),
("Rohit","History",98),
("Shrikant","History",99),
("Mayur","History",99)
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
marks_schema = ([
StructField("student_name",StringType()),
StructField("subject",StringType()),
StructField("marks",IntegerType())
])
marks_df = spark.createDataFrame(marks_data,marks_schema)
marks_df.display()
from pyspark.sql.window import Window
top_perf_window = Window.partitionBy(F.col("subject"))\
.orderBy(F.desc(F.col("marks")))
all_window_df = marks_df\
.withColumn("row_number",F.row_number().over(top_perf_window))\
.withColumn("rank",F.rank().over(top_perf_window))\
.withColumn("dense_rank",F.dense_rank().over(top_perf_window))
rank_df = marks_df\
.withColumn("rank",F.rank().over(top_perf_window))\
.filter(F.col("rank") == F.lit(1))\
.drop(F.col("rank"))
dense_rank_df = marks_df\
.withColumn("dense_rank",F.dense_rank().over(top_perf_window))\
.filter(F.col("dense_rank") == F.lit(1))\
.drop(F.col("dense_rank"))
marks_df.createOrReplaceTempView("marks")
spark.sql("""WITH T1 (student_name,subject,marks,rank) AS (SELECT student_name,subject,marks,rank() OVER (PARTITION BY subject ORDER BY marks DESC) AS rank FROM marks)
SELECT student_name,subject,marks FROM T1 WHERE rank = 1""").display()
spark.sql("""WITH T1 (student_name,subject,marks,dense_rank) AS (SELECT student_name,subject,marks,dense_rank() OVER (PARTITION BY subject ORDER BY marks DESC) AS dense_rank FROM marks)
SELECT student_name,subject,marks FROM T1 WHERE dense_rank = 1""").display()
(1, 1, "2024-01-01", "Placed"),
(1, 1, "2024-01-02", "Confirmed"),
(1, 1, "2024-01-03", "Cancelled")
(1, 2, "2024-01-01", "Placed"),
(1, 2, "2024-01-02", "Confirmed"),
(1, 2, "2024-01-04", "Shipped"),
(1, 2, "2024-01-05", "In-Transit"),
(1, 2, "2024-01-06", "Destination-City"),
(1, 2, "2024-01-07", "Out For Delivery"),
(1, 2, "2024-01-08", "Cancelled")
paritionBy - customer_id, order_id
orderBy - date
Next from particular column, we use lead window function.
(1, 2, "2024-01-07", "Out For Delivery")
(1, 1, "2024-01-07", "Out For Delivery")
(1, 3, "2024-01-07", "Out For Delivery")
# Window function - lead
orders_data = [
(1, 1, "2024-01-01", "Placed"),
(1, 1, "2024-01-02", "Confirmed"),
(1, 1, "2024-01-03", "Cancelled"),
(1, 1, "2024-01-04", "Shipped"),
(1, 1, "2024-01-05", "In-Transit"),
(1, 1, "2024-01-06", "Destination-City"),
(1, 1, "2024-01-07", "Out For Delivery"),
(1, 1, "2024-01-08", "Delivered"),
(1, 2, "2024-02-01", "Placed"),
(1, 2, "2024-02-02", "Confirmed"),
(1, 2, "2024-02-03", "Shipped"),
(1, 2, "2024-02-04", "In-Transit"),
(1, 2, "2024-02-05", "Destination-City"),
(1, 2, "2024-02-06", "Out For Delivery"),
(1, 2, "2024-02-07", "Cancelled"),
(2, 3, "2024-03-01", "Placed"),
(2, 3, "2024-03-02", "Confirmed"),
(2, 3, "2024-03-03", "Shipped"),
(2, 3, "2024-03-04", "In-Transit"),
(2, 3, "2024-03-05", "Destination-City"),
(2, 3, "2024-03-06", "Out For Delivery"),
(2, 3, "2024-03-07", "Delivered"),
(2, 4, "2024-04-01", "Placed"),
(2, 4, "2024-04-02", "Confirmed"),
(2, 4, "2024-04-03", "Shipped"),
(2, 4, "2024-04-04", "In-Transit"),
(2, 4, "2024-04-05", "Destination-City"),
(2, 4, "2024-04-06", "Out For Delivery"),
(2, 4, "2024-04-07", "Delivered"),
(3, 5, "2024-05-01", "Placed"),
(3, 5, "2024-05-02", "Confirmed"),
(3, 5, "2024-05-03", "Shipped"),
(3, 5, "2024-05-04", "In-Transit"),
(3, 5, "2024-05-05", "Destination-City"),
(3, 5, "2024-05-06", "Out For Delivery"),
(3, 5, "2024-05-07", "Delivered"),
(3, 6, "2024-06-01", "Placed"),
(3, 6, "2024-06-02", "Confirmed"),
(3, 6, "2024-06-03", "Shipped"),
(3, 6, "2024-06-04", "In-Transit"),
(3, 6, "2024-06-05", "Destination-City"),
(3, 6, "2024-06-06", "Out For Delivery"),
(3, 6, "2024-06-07", "Cancelled"),
(4, 7, "2024-07-01", "Placed"),
(4, 7, "2024-07-02", "Confirmed"),
(4, 7, "2024-07-03", "Shipped"),
(4, 7, "2024-07-04", "In-Transit"),
(4, 7, "2024-07-05", "Destination-City"),
(4, 7, "2024-07-06", "Out For Delivery"),
(4, 7, "2024-07-07", "Delivered"),
(4, 8, "2024-08-01", "Placed"),
(4, 8, "2024-08-02", "Confirmed"),
(4, 8, "2024-08-03", "Shipped"),
(4, 8, "2024-08-04", "In-Transit"),
(4, 8, "2024-08-05", "Destination-City"),
(4, 8, "2024-08-06", "Out For Delivery"),
(4, 8, "2024-08-07", "Delivered"),
(5, 9, "2024-09-01", "Placed"),
(5, 9, "2024-09-02", "Confirmed"),
(5, 9, "2024-09-03", "Shipped"),
(5, 9, "2024-09-04", "In-Transit"),
(5, 9, "2024-09-05", "Destination-City"),
(5, 9, "2024-09-06", "Out For Delivery"),
(5, 9, "2024-09-07", "Cancelled"),
(5, 10, "2024-10-01", "Placed"),
(5, 10, "2024-10-02", "Confirmed"),
(5, 10, "2024-10-03", "Shipped"),
(5, 10, "2024-10-04", "In-Transit"),
(5, 10, "2024-10-05", "Destination-City"),
(5, 10, "2024-10-06", "Out For Delivery"),
(5, 10, "2024-10-07", "Delivered"),
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
orders_schema = StructType([
StructField("customer_id",LongType()),
StructField("order_id",LongType()),
StructField("status_date",StringType()),
StructField("order_status",StringType())
])
orders_df = spark.createDataFrame(orders_data,orders_schema)
orders_df1 = order_df.withColumn("status_date",F.col("status_date").cast("date"))
orders_df2 = orders_df.withColumn("status_date",orders_df["status_date"].cast("date"))
from pyspark.sql.window import Window
next_status_window = Window.partitionBy(F.col("customer_id"),F.col("order_id"))\
.orderBy(F.col("status_date"))
orders_df3 = orders_df1.withColumn("next_order_status",F.lead(F.col("order_status")).over(next_status_window))
results_df = orders_df3.filter((F.col("order_status")==F.lit("Out For Delivery")) & (F.col("next_order_status")==F.lit("Cancelled")))\
.select(F.col("customer_id"),F.col("order_id"))
orders_df1.createOrReplaceTempView("orders")
spark.sql("""WITH T1 AS (SELECT *,LEAD(order_status) OVER (PARTITION BY customer_id,order_id ORDER BY status_date ASC) AS next_order_status FROM orders)
SELECT customer_id,order_id FROM T1 WHERE order_status = 'Out For Delivery' AND next_order_status = 'Cancelled'""").display()
(1, 1, "2024-01-01", "Placed"),
(1, 1, "2024-01-02", "Confirmed"),
(1, 1, "2024-01-03", "Cancelled")
(1, 2, "2024-01-01", "Placed"),
(1, 2, "2024-01-02", "Confirmed"),
(1, 2, "2024-01-04", "Shipped"),
(1, 2, "2024-01-05", "In-Transit"),
(1, 2, "2024-01-06", "Destination-City"),
(1, 2, "2024-01-07", "Out For Delivery"),
(1, 2, "2024-01-08", "Cancelled")
Performance Difference Between Lead and Lag -
1. Lag processes data by looking at previous rows. As spark works in
linear fashion, lag can be proved more perfromance efficient.
2. The catalyst optimiser optimises lag expressions little faster
than lead expressions.
from pyspark.sql.window import Window
lag_window = Window.partitionBy(F.col("customer_id"),F.col("order_id"))\
.orderBy(F.asc(F.col("status_date")))
results_df = orders_df1\
.withColumn("previous_order_status",F.lag(F.col("order_status")).over(lag_window))\
.filter((F.col("order_status")==F.lit("Cancelled")) & (F.col("previous_order_status")==F.lit("Out For Delivery")))\
.select(F.col("customer_id"),F.col("order_id"))
orders_df1.createOrReplaceTempView("orders")
spark.sql("""WITH T1 AS (SELECT *,LAG(order_status) OVER (PARTITION BY customer_id,order_id ORDER BY status_date ASC) AS previous_order_status FROM orders)
SELECT customer_id,order_id FROM T1 WHERE order_status = 'Cancelled' AND previous_order_status = 'Out For Delivery'""").display()
1. When salary is more than 50000, it is high salary. If it is less
than or equal to 50000 then it is low salary. Give me results for
each employee with their salary and if their salaries are high or low.
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",LongType()),
StructField("emp_name",StringType()),
StructField("emp_salary",LongType()),
StructField("emp_department",IntegerType())
])
emp_df = spark.read\
.format("csv")\
.option("header",True)\
.schema(emp_schema)\
.load("/FileStore/emp_data.csv")
emp_df.display()
emp_df1 = emp_df.withColumn("salary_type",F.expr("CASE WHEN emp_salary > 50000 THEN 'High Salary' ELSE 'Low Salary' END"))
emp_df2 = emp_df.select("*",F.expr("CASE WHEN emp_salary > 50000 THEN 'High Salary' ELSE 'Low Salary' END AS salary_type"))
emp_df3 = emp_df.select(F.col("emp_id"),F.col("emp_name"),F.col("emp_salary"),F.col("emp_department"),F.expr("CASE WHEN emp_salary > 50000 THEN 'High Salary' ELSE 'Low Salary' END AS salary_type"))
emp_df4 = emp_df.selectExpr("*","CASE WHEN emp_salary > 50000 THEN 'High Salary' ELSE 'Low Salary' END AS salary_type")
emp_df.createOrReplaceTempView("employee")
spark.sql("""SELECT *,
CASE
WHEN emp_salary > 50000 THEN 'High Salary'
ELSE 'Low Salary'
END AS salary_type
FROM employee""").display()
1. When salary is less than or equal to 10000 - Very Low Salary
when salary is greater than 10000 and less than or equal to 30000 - Low Salary
when salary is greater than 30000 and less than or equal to 50000 - Average Salary
when salary is greater than 50000 and less than or equal to 100000 - High Salary
when salary is greater 100000 - Very High Salary
emp_df1 = emp_df.select(
F.col("emp_id"),
F.col("emp_name"),
F.col("emp_department"),
F.col("emp_salary"),
F.expr(
"""CASE
WHEN emp_salary <= 10000 THEN 'Very Low Salary'
WHEN emp_salary > 10000 AND emp_salary <=30000 THEN 'Low Salary'
WHEN emp_salary > 30000 AND emp_salary <=50000 THEN 'Average Salary'
WHEN emp_salary > 50000 AND emp_salary <= 100000 THEN 'High Salary'
ELSE 'Very High Salary'
END AS salary_type"""
),
)
emp_df2 = emp_df.selectExpr(
"*",
"""CASE
WHEN emp_salary <= 10000 THEN 'Very Low Salary'
WHEN emp_salary > 10000 AND emp_salary <= 30000 THEN 'Low Salary'
WHEN emp_salary > 30000 AND emp_salary <= 50000 THEN 'Average Salary'
WHEN emp_salary > 50000 AND emp_salary <= 100000 THEN 'High Salary'
ELSE 'Very High Salary'
END AS salary_type"""
)
emp_df3 = emp_df.withColumn(
"salary_type",
F.expr(
"""CASE
WHEN emp_salary <= 10000 THEN 'Very Low Salary'
WHEN emp_salary > 10000 AND emp_salary <= 30000 THEN 'Low Salary'
WHEN emp_salary > 30000 AND emp_salary <= 50000 THEN 'Average Salary'
WHEN emp_salary > 50000 AND emp_salary <= 100000 THEN 'High Salary'
ELSE 'Very High Salary'
END"""
),
)
emp_df.createOrReplaceTempView("employee")
spark.sql(
"""SELECT *,
CASE
WHEN emp_salary <= 10000 THEN 'Very Low Salary'
WHEN emp_salary > 10000 AND emp_salary <= 30000 THEN 'Low Salary'
WHEN emp_salary > 30000 AND emp_salary <= 50000 THEN 'Average Salary'
WHEN emp_salary > 50000 AND emp_salary <= 100000 THEN 'High Salary'
ELSE 'Very High Salary'
END AS salary_type
FROM employee"""
).display()
1. When salary is more than 50000, it is high salary. If it is less
than or equal to 50000 then it is low salary. Give me results for
each employee with their salary and if their salaries are high or low.
emp_df1 = emp_df.withColumn(
"salary_type",
F.when(F.col("emp_salary") > F.lit(50000), F.lit("High Salary")).otherwise(
F.lit("Low Salary")
),
)
emp_df2 = emp_df.withColumn(
"salary_type",
F.when(emp_df["emp_salary"] > F.lit(50000), F.lit("High Salary")).otherwise(
F.lit("Low Salary")
),
)
emp_df.createOrReplaceTempView("employee")
spark.sql(
"""SELECT *,
CASE
WHEN emp_salary > 50000 THEN 'High Salary'
ELSE 'Low Salary'
END AS salary_type
FROM employee"""
).display()
1. When salary is less than or equal to 10000 - Very Low Salary
when salary is greater than 10000 and less than or equal to 30000 - Low Salary
when salary is greater than 30000 and less than or equal to 50000 - Average Salary
when salary is greater than 50000 and less than or equal to 100000 - High Salary
when salary is greater 100000 - Very High Salary
emp_df1 = emp_df.withColumn(
"salary_type",
F.when(F.col("emp_salary") <= F.lit(10000), F.lit("Very Low Salary"))
.when(
(F.col("emp_salary") > F.lit(10000)) & (F.col("emp_salary") <= 30000),
F.lit("Low Salary"),
)
.when(
(F.col("emp_salary") > F.lit(30000)) & (F.col("emp_salary") <= 50000),
F.lit("Average Salary"),
)
.when(
(F.col("emp_salary") > F.lit(50000)) & (F.col("emp_salary") <= 100000),
F.lit("High Salary"),
)
.otherwise(F.lit("Very High Salary")),
)
emp_df1.display()
emp_df.createOrReplaceTempView("employee")
spark.sql(
"""SELECT *,
CASE
WHEN emp_salary <= 10000 THEN 'Very Low Salary'
WHEN emp_salary > 10000 AND emp_salary <= 30000 THEN 'Low Salary'
WHEN emp_salary > 30000 AND emp_salary <= 50000 THEN 'Average Salary'
WHEN emp_salary > 50000 AND emp_salary <= 100000 THEN 'High Salary'
ELSE 'Very High Salary'
END AS salary_type
FROM employee"""
).display()
1. Give all the records where date is greater than 31-12-1994.
sample_data1 = [
("Ganesh","1995-11-22"),
("Akshay","1997-09-21")
]
sample_data2 = [
("Priyesh","23-11-1996"),
("Gitesh","12-01-1991")
]
The date we need to read it as string first and then convert
to date datatype.
Apache Spark by default gives all the dates in yyyy-MM-dd
format.
from pyspark.sql.types import *
from pyspark.sql import functions as F
sample_data1 = [
("Ganesh","1995-11-22"),
("Akshay","1997-09-21")
]
sample_schema = StructType([
StructField("name",StringType()),
StructField("date_of_birth",DateType())
])
sample_df1 = spark.createDataFrame(sample_data1,sample_schema)
sample_data2 = [
("Priyesh","23-11-1996"),
("Gitesh","12-01-1991")
]
sample_df2 = spark.createDataFrame(sample_data2,sample_schema)
sample_schema1 = StructType([
StructField("name",StringType()),
StructField("date_of_birth",StringType())
])
sample_df3 = spark.createDataFrame(sample_data1,sample_schema1)
sample_df3.printSchema()
sample_df3 = sample_df3.withColumn("date_of_birth",F.to_date("date_of_birth","yyyy-MM-dd"))
sample_df3.display()
sample_df3.printSchema()
sample_df4 = spark.createDataFrame(sample_data2,sample_schema1)\
.withColumn("date_of_birth",F.to_date("date_of_birth","dd-MM-yyyy"))
sample_df5 = sample_df4.filter(F.col("date_of_birth") > F.lit("1994-12-31"))
date_file_path = "/FileStore/PySpark_Series/dates/date_file_ddmmyyyy.csv"
Name DOB
Ganesh 22-11-1995
Akshay 23-01-1996
date_file_path = "/FileStore/PySpark_Series/dates/date_file_ddmmyyyy.csv"
sample_df = spark.read\
.format("csv")\
.schema(sample_schema)\
.option("header",True)\
.load(date_file_path)
sample_df2 = spark.read\
.format("csv")\
.option("header",True)\
.schema(sample_schema)\
.option("dateFormat","dd-MM-yyyy")\
.load(date_file_path)
sample_data = [
("Priyesh","IT Engineer", 32),
("Gitesh","Data Engineer", 35)
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
sample_schema = StructType([
StructField("emp_name",StringType()),
StructField("emp_department",StringType()),
StructField("emp_age",StringType())
])
sample_df = spark.createDataFrame(sample_data,sample_schema)
sample_df.display()
sample_df.printSchema()
results_df = sample_df.withColumn("created_timestamp",F.current_timestamp())\
.withColumn("created_date",F.to_date(F.col("created_timestamp")))
results_df.display()
results_df.printSchema()
1. Inner Join
- Inner Join will join each matching records from left data frame to
each matching record from right data frame based on join column.
emp_id emp_name manager_id
1 Ganesh 2
2 Akshay 1
department_id department_name
1 "DE"
2. Left Join - Left Outer Join
- It will join each matching records from left data frame to
each matching record from right data frame based on join column.
Also, if there is no match to any of the records from left
data frame, it will list down all those records padded with
NULL values for right data frame columns.
3. Right Join - Right Outer Join
- It will join each matching record from left data frame to
each matching record from right data frame based on join column.
Also, if there is no match to any of the records from right
data frame, it will list down all those records padded with
NULL values for left data frame columns.
4. Full Join - Full Outer Join
- It will join each matching record from left data frame to
each matching record from right data frame based on join column.
Also, if there is no match to any of the records from left
data frame, it will list down all those records padded with
NULL values for right data frame columns.
Also, if there is no match to any of the records from right
data frame, it will list down all those records padded with
NULL values for left data frame columns.
5. Anti Join - Left Anti Join
- It will list down all records from left data frame which has
no match in right data frame. It will only return the left
data frame columns as result.
6. Semi Join - Left Semi Join
- It will list down all records from left data frame which has
match in right data frame. It will only return the left
data frame columns as result.
7. Cross Join
- It is cartesian join for all rows from left data frame to
all rows in right data frame.
It joins all rows from left data frame to all rows from right
data frame irrespective of the match.
8. Self Join
- Joining same data frame with itself.
Problem Statement - Get the department name for all the employees
assigned to the departments. If there are employees which are not
assigned to any of the departments, do not return them in results.
Sample Data -
emp_data = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
Generic Structure for Join - df1 and df2
joined_df = df1.join(other=df2,on=[df1.emp_id==df2.emp_id],how="inner")
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
department_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df = spark.createDataFrame(data=emp_data,schema=emp_schema)
dep_df = spark.createDataFrame(data=department_data,schema=department_schema)
joined_df = emp_df.join(other=dep_df,on=[emp_df.dept_id==dep_df.department_id],how="inner")\
.select(F.col("emp_id"),F.col("emp_name"),F.col("department_id"),F.col("department_name"))
joined_df.display()
joined_df1 = emp_df.join(dep_df,emp_df.dept_id==dep_df.department_id,"inner")\
.select(F.col("emp_id"),F.col("emp_name"),F.col("department_id"),F.col("department_name"))
joined_df1.display()
joined_df2 = emp_df.join(dep_df,emp_df.dept_id==dep_df.department_id)\
.select(F.col("emp_id"),F.col("emp_name"),F.col("department_id"),F.col("department_name"))
joined_df2.display()
emp_df.createOrReplaceTempView("employee")
dep_df.createOrReplaceTempView("department")
spark.sql("SELECT employee.emp_id,employee.emp_name,department.department_id,department.department_name FROM employee INNER JOIN department ON employee.dept_id=department.department_id").display()
emp_data = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
department_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df = spark.createDataFrame(data=emp_data,schema=emp_schema)
dep_df = spark.createDataFrame(data=department_data,schema=department_schema)
emp_df.display()
dep_df.display()
joined_df = emp_df.join(dep_df,emp_df.dept_id==dep_df.department_id,"inner")
joined_df.display()
emp_df.createOrReplaceTempView("employee")
dep_df.createOrReplaceTempView("department")
spark.sql("SELECT employee.emp_id,employee.emp_name,department.department_id,department.department_name FROM employee JOIN department ON employee.dept_id=department.department_id").display()
emp_data = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("country",StringType()),
StructField("dept_id",IntegerType())
])
department_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("department_country",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dep_df = spark.createDataFrame(department_data,department_schema)
emp_df.display()
dep_df.display()
joined_df = emp_df.join(other=dep_df,on=[(emp_df.dept_id==dep_df.department_id) & (emp_df.country==dep_df.department_country)],how="inner")\
.select(F.col("emp_id"),F.col("emp_name"),F.col("department_id"),F.col("department_name"),F.col("department_country"))
joined_df.display()
joined_df1 = emp_df.join(other=dep_df,on=[(emp_df.dept_id==dep_df.department_id) & (emp_df.country==dep_df.department_country)])\
.select(F.col("emp_id"),F.col("emp_name"),F.col("department_id"),F.col("department_name"),F.col("department_country"))
joined_df1.display()
emp_dept_inner_join_condition = [(emp_df.dept_id==dep_df.department_id) & (emp_df.country==dep_df.department_country)]
joined_df2 = emp_df.join(other=dep_df,on=emp_dept_inner_join_condition)\
.select(F.col("emp_id"),F.col("emp_name"),F.col("department_id"),F.col("department_name"),F.col("department_country"))
joined_df2.display()
emp_df.createOrReplaceTempView("employee")
dep_df.createOrReplaceTempView("department")
spark.sql("SELECT e.emp_id,e.emp_name,d.department_id,d.department_country,d.department_name FROM employee e JOIN department d ON e.dept_id = d.department_id AND e.country = d.department_country").display()
emp_data = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dep_id",IntegerType())
])
dep_schema = StructType([
StructField("dept_id",IntegerType()),
StructField("dept_name",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dep_schema)
emp_df.display()
dept_df.display()
joined_df = emp_df.join(other=dept_df, on= [emp_df.dep_id == dept_df.dept_id],how="left")
joined_df1 = emp_df.join(dept_df,emp_df.dep_id==dept_df.dept_id,"left_outer")
joined_df1.display()
joined_df2 = emp_df.join(dept_df,emp_df.dep_id==dept_df.dept_id,"leftouter")
joined_df2.display()
emp_df.createOrReplaceTempView("employee")
dept_df.createOrReplaceTempView("dept")
spark.sql("SELECT * FROM employee LEFT JOIN dept ON employee.dep_id = dept.dept_id").display()
spark.sql("SELECT * FROM employee LEFT OUTER JOIN dept ON employee.dep_id = dept.dept_id").display()
emp_data = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dep_id",IntegerType())
])
dep_schema = StructType([
StructField("dept_id",IntegerType()),
StructField("dept_name",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dep_schema)
emp_df.display()
dept_df.display()
null_inner_join_df = emp_df.join(other=dept_df,on=[emp_df.dep_id == dept_df.dept_id],how="inner")
null_inner_join_df.display()
null_leftouter_df = emp_df.join(dept_df,emp_df.dep_id==dept_df.dept_id,"left")
null_leftouter_df.display()
null_leftouter_df1 = emp_df.join(dept_df,emp_df.dep_id==dept_df.dept_id,"left_outer")
null_leftouter_df1.display()
null_leftouter_df1 = emp_df.join(dept_df,emp_df.dep_id==dept_df.dept_id,"leftouter")
null_leftouter_df1.display()
emp_df.createOrReplaceTempView("emp")
dept_df.createOrReplaceTempView("dept")
spark.sql("SELECT * FROM emp Left JOIN dept ON emp.dep_id=dept.dept_id").display()
spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.dep_id=dept.dept_id").display()
emp_data = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
joined_df = df1.join(other={another_dataframe},on={join_condition},
how={join_type})
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("country",StringType()),
StructField("dep_id",IntegerType())
])
dep_schema = StructType([
StructField("dept_id",IntegerType()),
StructField("dept_name",StringType()),
StructField("country",StringType())
])
emp_df = spark.createDataFrame(data=emp_data,schema=emp_schema)
dept_df = spark.createDataFrame(data=department_data,schema=dep_schema)
emp_df.display()
dept_df.display()
joined_df = emp_df.join(other=dept_df,on=[(emp_df.dep_id == dept_df.dept_id) & (emp_df.country == dept_df.country)],how="left_outer")
joined_df.display()
joined_df1 = emp_df.join(other=dept_df,on=[(emp_df.dep_id == dept_df.dept_id) & (emp_df.country == dept_df.country)],how="leftouter")
joined_df1.display()
joined_df1 = emp_df.join(other=dept_df,on=[(emp_df.dep_id == dept_df.dept_id) & (emp_df.country == dept_df.country)],how="left")
joined_df1.display()
emp_df.createOrReplaceTempView("emp")
dept_df.createOrReplaceTempView("dept")
%sql
SELECT * FROM emp LEFT JOIN dept ON emp.dep_id = dept.dept_id AND emp.country = dept.country
emp_data = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dept_schema)
emp_df.display()
dept_df.display()
right_join_df = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"right")
right_join_df.display()
right_join_df1 = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"right_outer")
right_join_df1.display()
right_join_df2 = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"rightouter")
right_join_df2.display()
emp_df.createOrReplaceTempView("employee")
dept_df.createOrReplaceTempView("department")
spark.sql("SELECT * FROM employee RIGHT JOIN department ON employee.dept_id = department.department_id").display()
emp_data = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
emp_data = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dept_schema)
emp_df.display()
dept_df.display()
right_outerjoin_df = emp_df.join(other=dept_df,on=[emp_df.dept_id == dept_df.department_id],how = "rightouter")
right_outerjoin_df.display()
right_outerjoin_df1 = emp_df.join(other=dept_df,on=[emp_df.dept_id == dept_df.department_id],how = "right_outer")
right_outerjoin_df1.display()
right_outerjoin_df2 = emp_df.join(other=dept_df,on=[emp_df.dept_id == dept_df.department_id],how = "right")
right_outerjoin_df2.display()
emp_df.createOrReplaceTempView("empployee")
dept_df.createOrReplaceTempView("department")
spark.sql("SELECT * FROM empployee RIGHT JOIN department ON empployee.dept_id = department.department_id").display()
spark.sql("SELECT * FROM empployee RIGHT OUTER JOIN department ON empployee.dept_id = department.department_id").display()
emp_data = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
emp_data = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_country",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("department_country",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dept_schema)
emp_df.display()
dept_df.display()
right_outer_join_multiple_col_df = emp_df.join(dept_df,[(emp_df.dept_id == dept_df.department_id) & (emp_df.emp_country == dept_df.department_country)],"right")
right_outer_join_multiple_col_df.display()
right_outer_join_multiple_col_df1 = emp_df.join(dept_df,[(emp_df.dept_id == dept_df.department_id) & (emp_df.emp_country == dept_df.department_country)],"rightouter")
right_outer_join_multiple_col_df1.display()
right_outer_join_multiple_col_df2 = emp_df.join(dept_df,[(emp_df.dept_id == dept_df.department_id) & (emp_df.emp_country == dept_df.department_country)],"right_outer")
right_outer_join_multiple_col_df2.display()
emp_df.createOrReplaceTempView("employee")
dept_df.createOrReplaceTempView("department")
spark.sql("SELECT * FROM employee RIGHT JOIN department ON employee.dept_id = department.department_id AND employee.emp_country = department.department_country").display()
spark.sql("SELECT * FROM employee RIGHT OUTER JOIN department ON employee.dept_id = department.department_id AND employee.emp_country = department.department_country").display()
emp_data = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
emp_data = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType()),
])
dept_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dept_schema)
emp_df.display()
dept_df.display()
full_outer_join_df = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"full")
full_outer_join_df.display()
full_outer_join_df1 = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"fullouter")
full_outer_join_df1.display()
full_outer_join_df2 = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"full_outer")
full_outer_join_df2.display()
emp_df.createOrReplaceTempView("employee")
dept_df.createOrReplaceTempView("department")
spark.sql("SELECT * FROM employee FULL JOIN department ON employee.dept_id=department.department_id").display()
spark.sql("SELECT * FROM employee FULL OUTER JOIN department ON employee.dept_id=department.department_id").display()
emp_data = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3),
(9,"Person9",6)
]
department_data = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType()),
])
full_outer_join_null_df = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"full")
full_outer_join_null_df.display()
full_outer_join_null_df1 = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"fullouter")
full_outer_join_null_df1.display()
full_outer_join_null_df2 = emp_df.join(dept_df,emp_df.dept_id == dept_df.department_id,"full_outer")
full_outer_join_null_df2.display()
emp_data = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_country",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("department_country",StringType())
])
emp_df = spark.createDataFrame(emp_data,emp_schema)
dept_df = spark.createDataFrame(department_data,dept_schema)
emp_df.display()
dept_df.display()
full_outer_join = emp_df.join(other = dept_df,on = [(emp_df.dept_id == dept_df.department_id) & (emp_df.emp_country == dept_df.department_country)],how = "full_outer")
full_outer_join.display()
What is left semi join - It will perform the join based on If
there is match of left dataset to right dataset. If there is match,
it will give those records as output.
Left semi join will give the columns of only left dataset as output.
On the other hand, inner join will give columns of both left
and right dataset.
customers - orders
Give me all customer details only who have placed order so far.
I don't want customers
in the output who have not yet placed any order.
emp_data_single_column = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data_single_column = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
emp_data_null = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3),
(9,"Person9",6)
]
department_data_null = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
emp_data_multiple_columns = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data_multiple_columns = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema_single_column = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema_single_column = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df_single_column = spark.createDataFrame(emp_data_single_column,emp_schema_single_column)
dept_df_single_column = spark.createDataFrame(department_data_single_column,dept_schema_single_column)
emp_df_single_column.display()
dept_df_single_column.display()
left_semi_join_df = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"left_semi")
left_semi_join_df.display()
left_semi_join_df1 = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"leftsemi")
left_semi_join_df1.display()
left_semi_join_df1 = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"semi")
left_semi_join_df1.display()
left_semi_join_df6 = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"inner").select(F.col("emp_id"),F.col("emp_name"),F.col("dept_id"))
left_semi_join_df6.display()
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema_null = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema_null = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df_null = spark.createDataFrame(emp_data_null,emp_schema_null)
dept_df_null = spark.createDataFrame(department_data_null,dept_schema_null)
emp_df_null.display()
dept_df_null.display()
left_semi_join_df4 = emp_df_null.join(dept_df_null,emp_df_null.dept_id == dept_df_null.department_id,"semi")
left_semi_join_df4.display()
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema_multiple_columns = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_country",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema_multiple_columns = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("department_country",StringType()),
])
emp_df_multiple_columns = spark.createDataFrame(emp_data_multiple_columns,emp_schema_multiple_columns)
dept_df_multiple_columns = spark.createDataFrame(department_data_multiple_columns,dept_schema_multiple_columns)
emp_df_multiple_columns.display()
dept_df_multiple_columns.display()
left_semi_join_df5 = emp_df_multiple_columns.join(dept_df_multiple_columns,[(emp_df_multiple_columns.dept_id == dept_df_multiple_columns.department_id) & (emp_df_multiple_columns.emp_country == dept_df_multiple_columns.department_country)],"semi")
left_semi_join_df5.display()
emp_df_single_column.createOrReplaceTempView("employee")
dept_df_single_column.createOrReplaceTempView("department")
spark.sql("SELECT * FROM employee LEFT SEMI JOIN department ON employee.dept_id = department.department_id").display()
spark.sql("SELECT * FROM employee SEMI JOIN department ON employee.dept_id = department.department_id").display()
What is left semi join - It will perform the join based on If
there is match of left dataset to right dataset. If there is match,
it will give those records as output.
Left semi join will give the columns of only left dataset as output.
What is left Anti join - It will perform the join based on If
there is match of left dataset to right dataset. If there is match,
it will not give those records as output.
Left anti join will give the columns of only left dataset as output.
Customers - Orders (Left Semi)
Give me all customer details only who have placed order so far.
I don't want customers in the output who have not yet placed any order.
Customers - Orders (Left Anti)
Give me all customer details only who have not placed order so far.
I want customers in the output who have not yet placed
any order.
Left Anti join gives all records from left dataset which are not
part of left semi join.
emp_data_single_column = [
(1,"Person1",1),
(2,"Person2",2),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",6),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3)
]
department_data_single_column = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE")
]
emp_data_null = [
(1,"Person1",1),
(2,"Person2",None),
(3,"Person3",1),
(4,"Person4",1),
(5,"Person5",None),
(6,"Person6",4),
(7,"Person6",2),
(8,"Person8",3),
(9,"Person9",6)
]
department_data_null = [
(1,"IT"),
(2,"HR"),
(3,"DE"),
(4,"BE"),
(5,"FE"),
(None,"TRS")
]
emp_data_multiple_columns = [
(1,"Person1","IN",1),
(2,"Person2","IN",2),
(3,"Person3","IN",1),
(4,"Person4","IN",1),
(5,"Person5","IN",6),
(6,"Person6","SA",4),
(7,"Person6","UK",2),
(8,"Person8","IN",3),
(4,"Person4","UK",1),
(5,"Person5","IN",6),
(6,"Person6","US",4)
]
department_data_multiple_columns = [
(1,"IT","IN"),
(2,"HR","US"),
(3,"DE","IN"),
(4,"BE","UK"),
(5,"FE","SA")
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema_single_column = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema_single_column = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df_single_column = spark.createDataFrame(emp_data_single_column,emp_schema_single_column)
dept_df_single_column = spark.createDataFrame(department_data_single_column,dept_schema_single_column)
emp_df_single_column.display()
dept_df_single_column.display()
emp_df_single_column.createOrReplaceTempView("employee")
dept_df_single_column.createOrReplaceTempView("department")
spark.sql("SELECT * FROM employee LEFT ANTI JOIN department ON employee.dept_id=department.department_id").display()
single_column_semi_join_df = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"semi")
single_column_semi_join_df.display()
single_column_anti_join_df = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"anti")
single_column_anti_join_df.display()
single_column_anti_join_df1 = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"left_anti")
single_column_anti_join_df1.display()
single_column_anti_join_df2 = emp_df_single_column.join(dept_df_single_column,emp_df_single_column.dept_id == dept_df_single_column.department_id,"leftanti")
single_column_anti_join_df2.display()
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema_null = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema_null = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType())
])
emp_df_null = spark.createDataFrame(emp_data_null,emp_schema_null)
dept_df_null = spark.createDataFrame(department_data_null,dept_schema_null)
emp_df_null.display()
dept_df_null.display()
lef_semi_null_df = emp_df_null.join(dept_df_null,emp_df_null.dept_id == dept_df_null.department_id,"semi")
lef_semi_null_df.display()
lef_anti_null_df = emp_df_null.join(dept_df_null,emp_df_null.dept_id == dept_df_null.department_id,"anti")
lef_anti_null_df.display()
from pyspark.sql.types import *
from pyspark.sql import functions as F
emp_schema_multiple_columns = StructType([
StructField("emp_id",IntegerType()),
StructField("emp_name",StringType()),
StructField("emp_country",StringType()),
StructField("dept_id",IntegerType())
])
dept_schema_multiple_columns = StructType([
StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("department_country",StringType()),
])
emp_df_multiple_columns = spark.createDataFrame(emp_data_multiple_columns,emp_schema_multiple_columns)
dept_df_multiple_columns = spark.createDataFrame(department_data_multiple_columns,dept_schema_multiple_columns)
emp_df_multiple_columns.display()
dept_df_multiple_columns.display()
1. Read the dataframe - spark.read.format("").option()\
.schema.load(file_path)
2. We process this dataframe
3. We write the result to some location
df.write.format().mode().option("path",writing_path).save()
df.write.format().mode().option("path",writing_path)\
.saveAsTable(table_name)
Data Frame read modes -
1. Permissive
2. Failfast
3. DropMalformed
While reading the dataframe, we can specify the file path or
folder path.
While writing the dataframe, we can specify folder path.
Data Frame writer modes -
1. Append - It will append the data to the existing data in the folder.
2. Overwrite - It will overwrite existing data in the folder
with the data we are writing.
3. Error - It will throw an error of there is data already present
in the folder. This is default writer mode.
4. Ignore - It will ignore or it will not write the data to the
folder location if data is already present.
It will not throw any error.
But if the data is not already present, it will write the data to
the folder specified.
marks_data = [
("Ganesh","English",99),
("Akshay","English",99),
("Priyesh","English",98),
("Rohit","English",98),
("Shrikant","English",98),
("Mayur","English",97),
("Ganesh","History",77),
("Akshay","History",77),
("Priyesh","History",98),
("Rohit","History",98),
("Shrikant","History",99),
("Mayur","History",99)
]
from pyspark.sql.types import *
from pyspark.sql import functions as F
marks_schema = StructType([
StructField("student_name",StringType()),
StructField("subject",StringType()),
StructField("marks",IntegerType())
])
marks_df = spark.createDataFrame(marks_data,marks_schema)
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").mode("append").option("path","/Volumes/demo/default/landing").save()
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").mode("append").option("path","/Volumes/demo/default/landing").save()
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").mode("overwrite").option("path","/Volumes/demo/default/landing").save()
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").mode("error").option("path","/Volumes/demo/default/landing").save()
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").mode("ignore").option("path","/Volumes/demo/default/landing").save()
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").mode("ignore").option("path","/Volumes/demo/default/landing").save()
marks_df.filter(F.col("marks") >= F.lit(99)).write.format("parquet").option("path","/Volumes/demo/default/landing").save()
marks_df1 = spark.read.parquet("/Volumes/demo/default/landing/*")
marks_df1.display()
marks_df1.count()
dbutils.fs.rm("/Volumes/demo/default/landing",True)
Course Description:
This hands-on course is designed for aspiring and experienced data engineers who want to master PySpark—the powerful distributed computing framework built on Apache Spark. Led by Ganesh Kudale, a seasoned data engineer, the series walks learners through real-world scenarios, from foundational concepts to advanced transformations, with a strong focus on production-grade pipeline development.
What You'll Learn:
PySpark Essentials: RDDs, Data Frames, and Spark SQL
Data Ingestion & ETL: Reading from CSV, JSON, Parquet
Transformations & Actions: Filtering, joins, aggregations, and window functions
Who Should Enroll:
Data engineers working with big data platforms
Developers transitioning from SQL to PySpark
Professionals building scalable pipelines in Big Data
Anyone preparing for Spark-related interviews
Curriculum -
Session 1 - Creating the raw data frame
Session 2 - Defining the Schema in PySpark
Session 3 - Reading the data frame from file stored at storage location
Session 4 - Different ways of creating the data frame
Session 5 - Transformations and Action in Apache Spark
Session 6 - Data Frame Read Modes
Session 7 - PySpark withColumn Transformation
Session 8 - PySpark datatype conversions
Session 9 - withColumn in PySpark VS spark SQL
Session 10 - PySpark select transformation
Session 11 - PySpark selectExpr Transformation
Session 12 - Performance difference between select, selectExpr and withColumn transformations
Session 13 - Renaming the column in PySpark data frame and using Spark SQL
Session 14 - Performance efficient approach for renaming columns in PySpark data frame
Session 15 - Filtering data in PySpark
Session 16 - Efficient ways to filter the data in PySpark
Session 17 - Sorting in PySpark Single Column
Session 18 - Sorting in PySpark - Multiple Columns
Session 19 - Sorting in Spark SQL
Session 20 - Performance difference between sort and orderBy in PySpark
Session 21 - Aggregations in PySpark
Session 22 - Simple Aggregations in PySpark - Count, Average, Max, Min
Session 23 - Introduction to Grouping aggregations in PySpark
Session 24 - Grouping aggregations in PySpark - Continuation
Session 25 - Grouping aggregations in PySpark - Continuation 1
Session 26 - Grouping Aggregations on Multiple Columns in PySpark
Session 27 - Grouping Aggregations on Multiple Columns in PySpark Continuation
Session 28 - Running multiple grouping aggregations together
Session 29 - Windowing Aggregations in PySpark - Row_Number
Session 30 - Windowing Aggregations in PySpark - Rank
Session 31 - Windowing Aggregations in PySpark - Dense Rank
Session 32 - Remove duplicates using PySpark window functions
Session 33 - Top scorer students in each subject using PySpark window functions
Session 34 - PySpark Window Function Lead Data Frame
Session 35 - PySpark Window Function Lead Spark SQL
Session 36 - PySpark Window Function - LAG
Session 37 - CASE WHEN in PySpark - One when Condition
Session 38 - CASE WHEN in PySpark - Multiple when Conditions and Multiple Conditions within when
Session 39 - WHEN Otherwise in PySpark - One when Condition
Session 40 - WHEN Otherwise in PySpark - Multiple when Conditions
Session 41 - Working With dates in PySpark - Python List
Session 42 - Working With dates in PySpark - Storage Location
Session 43 - Adding created timestamp and created date to the newly added data in PySpark
Session 44 - Joins in PySpark - Theory
Session 45 - Inner Join in PySpark - Joining over one Column
Session 46 - Inner Join in PySpark - Joining over one Column - NULL values in joining Columns
Session 47 - Inner Join in PySpark - Joining over multiple Columns
Session 48 - Left Outer Join in PySpark - Joining over one Column
Session 49 - Left Outer Join in PySpark - Joining over one Column - NULL values in joining Columns
Session 50 - Left Outer Join in PySpark - Joining over multiple Columns
Session 51 - Right Outer Join in PySpark - Joining over one Column
Session 52 - Right Outer Join in PySpark - Joining over one Column - NULL values in joining Columns
Session 53 - Right Outer Join in PySpark - Joining over multiple Columns
Session 54 - Full Outer Join in PySpark - Joining over one Column
Session 55 - Full Outer Join in PySpark - Joining over one Column - NULL values in joining Columns
Session 56 - Full Outer Join in PySpark - Joining over multiple Columns
Session 57 - Left Semi Join in PySpark
Session 58 - Left Anti Join in PySpark
Session 59 - Reading Single Line JSON file as PySpark Data frame
Session 60 - Reading Multi Line JSON file as PySpark Data frame
Session 61 - Reading parquet file as PySpark data frame
Session 62 - Data Frame writer API and data frame writer Modes