DE1 — Lab 3: Physical Representations and Batch II Costs

Author : Badr TAJINI – Data Engineering I – ESIEE 2025–2026

Instruction: Execute all cells. Capture plans and Spark UI evidence.


0. Setup and explicit schema


from pyspark.sql import SparkSession, functions as F, types as T

spark = SparkSession.builder.appName("de1-lab3").getOrCreate()

clicks_schema = T.StructType([
    T.StructField("prev_title", T.StringType(), True),
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("n", T.IntegerType(), True),
    T.StructField("ts", T.TimestampType(), True),
])

dim_schema = T.StructType([
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("curr_category", T.StringType(), True),
])
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile SparkUI binding warning on port 4040

1. Ingest monthly CSVs (row format baseline)


base = "/home/sable/Documents/data engineering1/lab3-practice/data/"
paths = [
    f"{base}lab3_clicks_2025-05.csv",
    f"{base}lab3_clicks_2025-06.csv",
    f"{base}lab3_clicks_2025-07.csv"
]

row_df = (
    spark.read.schema(clicks_schema)
    .option("header", "true")
    .csv(paths)
    .withColumn("year", F.year("ts"))
    .withColumn("month", F.month("ts"))
)

row_df.cache()
print("Rows:", row_df.count())
row_df.printSchema()
row_df.show(5, truncate=False)
Rows: 15000 Schema includes prev_title, curr_title, type, n, ts, year, month Top rows displayed

Evidence: row representation plan


q1_row = (
    row_df.filter(F.col("type") == "link")
    .groupBy("year", "month", "prev_title", "curr_title")
    .agg(F.sum("n").alias("n"))
    .orderBy(F.desc("n"))
    .limit(50)
)

q1_row.explain("formatted")
Physical plan showing Scan CSV → Filter → Aggregate → Exchange → Aggregate → Sort

2. Column representation: Parquet with partitioning


col_base = "outputs/lab3/columnar"

row_df.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(f"{col_base}/clicks_parquet")

col_df = spark.read.parquet(f"{col_base}/clicks_parquet")
col_df.cache()

print("Columnar rows:", col_df.count())
Columnar rows: 15000

Evidence: column representation plan

Scan parquet → ColumnarToRow → Filter → Aggregate → Exchange → Aggregate → Sort

3. Join strategy: normal vs broadcast


dim = spark.read.schema(dim_schema) \
    .option("header", "true") \
    .csv("data/lab3_dim_curr_category.csv")

from pyspark.sql.functions import broadcast

j2 = (
    col_df.join(broadcast(dim), "curr_title", "left")
    .groupBy("curr_category")
    .agg(F.sum("n").alias("total_n"))
    .orderBy(F.desc("total_n"))
)

j2.explain("formatted")
BroadcastHashJoin used instead of SortMergeJoin

4. Additional queries for metrics


q2_row = row_df.filter(
    (F.col("type") == "link") &
    F.col("curr_title").isin("Apache_Spark", "PySpark")
)

q2_col = col_df.filter(
    (F.col("type") == "link") &
    F.col("curr_title").isin("Apache_Spark", "PySpark")
)

_ = q2_row.count()
_ = q2_col.count()
Open Spark UI at http://localhost:4040 and record metrics

5. Save sample outputs


q1_row.limit(10).toPandas().to_csv("outputs/lab3/q1_row_top10.csv", index=False)
q1_col.limit(10).toPandas().to_csv("outputs/lab3/q1_col_top10.csv", index=False)
j2.limit(20).toPandas().to_csv("outputs/lab3/j2_broadcast_sample.csv", index=False)
Saved sample outputs in outputs/lab3/

6. Cleanup


spark.stop()
print("Spark session stopped.")
Spark session stopped.