Author : Badr TAJINI – Data Engineering I – ESIEE 2025–2026
Instruction: Execute all cells. Capture plans and Spark UI evidence.
from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab3").getOrCreate()
clicks_schema = T.StructType([
T.StructField("prev_title", T.StringType(), True),
T.StructField("curr_title", T.StringType(), True),
T.StructField("type", T.StringType(), True),
T.StructField("n", T.IntegerType(), True),
T.StructField("ts", T.TimestampType(), True),
])
dim_schema = T.StructType([
T.StructField("curr_title", T.StringType(), True),
T.StructField("curr_category", T.StringType(), True),
])
base = "/home/sable/Documents/data engineering1/lab3-practice/data/"
paths = [
f"{base}lab3_clicks_2025-05.csv",
f"{base}lab3_clicks_2025-06.csv",
f"{base}lab3_clicks_2025-07.csv"
]
row_df = (
spark.read.schema(clicks_schema)
.option("header", "true")
.csv(paths)
.withColumn("year", F.year("ts"))
.withColumn("month", F.month("ts"))
)
row_df.cache()
print("Rows:", row_df.count())
row_df.printSchema()
row_df.show(5, truncate=False)
q1_row = (
row_df.filter(F.col("type") == "link")
.groupBy("year", "month", "prev_title", "curr_title")
.agg(F.sum("n").alias("n"))
.orderBy(F.desc("n"))
.limit(50)
)
q1_row.explain("formatted")
col_base = "outputs/lab3/columnar"
row_df.write.mode("overwrite") \
.partitionBy("year", "month") \
.parquet(f"{col_base}/clicks_parquet")
col_df = spark.read.parquet(f"{col_base}/clicks_parquet")
col_df.cache()
print("Columnar rows:", col_df.count())
dim = spark.read.schema(dim_schema) \
.option("header", "true") \
.csv("data/lab3_dim_curr_category.csv")
from pyspark.sql.functions import broadcast
j2 = (
col_df.join(broadcast(dim), "curr_title", "left")
.groupBy("curr_category")
.agg(F.sum("n").alias("total_n"))
.orderBy(F.desc("total_n"))
)
j2.explain("formatted")
q2_row = row_df.filter(
(F.col("type") == "link") &
F.col("curr_title").isin("Apache_Spark", "PySpark")
)
q2_col = col_df.filter(
(F.col("type") == "link") &
F.col("curr_title").isin("Apache_Spark", "PySpark")
)
_ = q2_row.count()
_ = q2_col.count()
q1_row.limit(10).toPandas().to_csv("outputs/lab3/q1_row_top10.csv", index=False)
q1_col.limit(10).toPandas().to_csv("outputs/lab3/q1_col_top10.csv", index=False)
j2.limit(20).toPandas().to_csv("outputs/lab3/j2_broadcast_sample.csv", index=False)
spark.stop()
print("Spark session stopped.")