Scenario Based Interview
Pyspark vs
Spark SQL
Ganesh. R
Scenario: Create a SQL query that will produce an output with more records for open positions.
For example, if your employee name is "vacant" and you have vacant titles, add as many records
to the input as there are open postings.
from pyspark.sql import SparkSession
# Initialize Spark session
spark =
SparkSession.builder.appName("JobPositionsAndEmployees").getOrCreate()
# Create schema and data for job_positions
job_positions_schema = ["id", "title", "groups", "levels", "payscale",
"totalpost"]
job_positions_data = [
(1, 'General manager', 'A', 'l-15', 10000, 1),
(2, 'Manager', 'B', 'l-14', 9000, 5),
(3, 'Asst. Manager', 'C', 'l-13', 8000, 10)
]
# Create DataFrame for job_positions
job_positions_df = spark.createDataFrame(job_positions_data,
schema=job_positions_schema)
# Create schema and data for job_employees
job_employees_schema = ["id", "name", "position_id"]
job_employees_data = [
(1, 'John Smith', 1),
(2, 'Jane Doe', 2),
(3, 'Michael Brown', 2),
(4, 'Emily Johnson', 2),
(5, 'William Lee', 3),
(6, 'Jessica Clark', 3),
(7, 'Christopher Harris', 3),
(8, 'Olivia Wilson', 3),
(9, 'Daniel Martinez', 3),
(10, 'Sophia Miller', 3)
]
# Create DataFrame for job_employees
job_employees_df = spark.createDataFrame(job_employees_data,
schema=job_employees_schema)
# Show the DataFrames
job_positions_df.display()
job_employees_df.display()
###PySpark
from pyspark.sql.functions import col, lit, when
from pyspark.sql import Row
# Create DataFrame for job_employees
job_employees_df = spark.createDataFrame(job_employees_data,
schema=job_employees_columns)
# Create a DataFrame for all required rows (totalpost rows for each
job position)
expanded_positions = job_positions_df.rdd.flatMap(lambda row:
[Row(id=row['id'], title=row['title'], groups=row['groups'],
levels=row['levels'], payscale=row['payscale'],
totalpost=row['totalpost'], pos_num=i) for i in
range(row['totalpost'])]).toDF()
# Add a column pos_num to the job_employees_df to facilitate the join
job_employees_df_with_pos_num = job_employees_df.withColumn('pos_num',
lit(None).cast('int'))
# Perform the join and fill vacancies with "Vacant"
joined_df = expanded_positions.join(job_employees_df_with_pos_num,
(expanded_positions.id == job_employees_df_with_pos_num.position_id) &
(expanded_positions.pos_num == job_employees_df_with_pos_num.pos_num),
'left') \
.select('title', 'groups', 'payscale', when(col('name').isNull(),
lit('Vacant')).otherwise(col('name')).alias('name'))
# Show the result
joined_df.display()
###SQL
job_positions_df.createOrReplaceTempView("job_positions")
job_employees_df.createOrReplaceTempView("job_employees")
%sql
with cte as(
select
name,
position_id,
row_number() over(
order by
a.id
) as rn
from
job_employees as a
join job_positions as b on a.position_id = b.id
),
jp as (
select
a.id,
a.title,
a.groups,
a.payscale,
a.levels,
b.rn
from
job_positions as a
join cte as b on b.rn <= a.totalpost
)
select
a.title,
a.groups,
a.payscale,
coalesce(b.name, 'Vacant')
from
jp as a
left join cte as b on b.rn = a.rn
and b.position_id = a.id
order by
a.id,
b.rn;
IF YOU FOUND
THIS POST
USEFUL, PLEASE
SAVE IT.
Ganesh. R
+91-9030485102. Hyderabad, Telangana. rganesh0203@gmail.com
https://medium.com/@rganesh0203 https://rganesh203.github.io/Portfolio/
https://github.com/rganesh203. https://www.linkedin.com/in/r-ganesh-a86418155/
https://www.instagram.com/rg_data_talks/ https://topmate.io/ganesh_r0203