DE1 — Lab 3: Physical Representations and Batch II Costs

Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026


Execute all cells. Capture plans and Spark UI evidence.

0. Setup and explicit schema

from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("de1-lab3").getOrCreate()
 
clicks_schema = T.StructType([
    T.StructField("prev_title", T.StringType(), True),
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("n", T.IntegerType(), True),
    T.StructField("ts", T.TimestampType(), True),
])
dim_schema = T.StructType([
    T.StructField("curr_title", T.StringType(), True),
    T.StructField("curr_category", T.StringType(), True),
])
 
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/20 14:17:49 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 10.192.33.105 instead (on interface wlp2s0)
25/12/20 14:17:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/20 14:17:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/20 14:17:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

1. Ingest monthly CSVs (row format baseline)

base = "/home/sable/Documents/data engineering1/lab3-practice/data/"
paths = [f"{base}lab3_clicks_2025-05.csv", f"{base}lab3_clicks_2025-06.csv", f"{base}lab3_clicks_2025-07.csv"]
row_df = (spark.read.schema(clicks_schema).option("header","true").csv(paths)
            .withColumn("year", F.year("ts")).withColumn("month", F.month("ts")))
row_df.cache()
print("Rows:", row_df.count())
row_df.printSchema()
row_df.show(5, truncate=False)
 
Rows: 15000
root
 |-- prev_title: string (nullable = true)
 |-- curr_title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- n: integer (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-----------------------------+--------------+----+---+-------------------+----+-----+
|prev_title                   |curr_title    |type|n  |ts                 |year|month|
+-----------------------------+--------------+----+---+-------------------+----+-----+
|ETL                          |PySpark       |link|431|2025-06-01 02:57:00|2025|6    |
|Data_engineering             |Broadcast_join|link|347|2025-06-15 13:40:00|2025|6    |
|Python_(programming_language)|MapReduce     |link|39 |2025-06-07 15:14:00|2025|6    |
|ETL                          |Data_warehouse|link|401|2025-06-07 04:59:00|2025|6    |
|Python_(programming_language)|Dataframe     |link|155|2025-06-06 06:40:00|2025|6    |
+-----------------------------+--------------+----+---+-------------------+----+-----+
only showing top 5 rows

Evidence: row representation plan

# Query Q1: top transitions per month for 'link'
q1_row = (row_df.filter(F.col("type")=="link")
           .groupBy("year","month","prev_title","curr_title")
           .agg(F.sum("n").alias("n"))
           .orderBy(F.desc("n"))
           .limit(50))
q1_row.explain("formatted")
 
import pathlib, datetime as _dt
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_row.txt","w") as f:
    f.write(str(_dt.datetime.now())+"\n")
    f.write(q1_row._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_row.txt")
 
== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
   +- HashAggregate (9)
      +- Exchange (8)
         +- HashAggregate (7)
            +- Project (6)
               +- Filter (5)
                  +- InMemoryTableScan (1)
                        +- InMemoryRelation (2)
                              +- * Project (4)
                                 +- Scan csv  (3)


(1) InMemoryTableScan
Output [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Arguments: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6], [isnotnull(type#2), (type#2 = link)]

(2) InMemoryRelation
Arguments: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year#6, month#7], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan csv 
Output [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]
Batched: false
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/lab3-practice/data/lab3_clicks_2025-05.csv, ... 2 entries]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) Project [codegen id : 1]
Output [7]: [prev_title#0, curr_title#1, type#2, n#3, ts#4, year(cast(ts#4 as date)) AS year#6, month(cast(ts#4 as date)) AS month#7]
Input [5]: [prev_title#0, curr_title#1, type#2, n#3, ts#4]

(5) Filter
Input [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]
Condition : (isnotnull(type#2) AND (type#2 = link))

(6) Project
Output [5]: [prev_title#0, curr_title#1, n#3, year#6, month#7]
Input [6]: [curr_title#1, month#7, n#3, prev_title#0, type#2, year#6]

(7) HashAggregate
Input [5]: [prev_title#0, curr_title#1, n#3, year#6, month#7]
Keys [4]: [year#6, month#7, prev_title#0, curr_title#1]
Functions [1]: [partial_sum(n#3)]
Aggregate Attributes [1]: [sum#510L]
Results [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]

(8) Exchange
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
Arguments: hashpartitioning(year#6, month#7, prev_title#0, curr_title#1, 200), ENSURE_REQUIREMENTS, [plan_id=85]

(9) HashAggregate
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, sum#511L]
Keys [4]: [year#6, month#7, prev_title#0, curr_title#1]
Functions [1]: [sum(n#3)]
Aggregate Attributes [1]: [sum(n#3)#404L]
Results [5]: [year#6, month#7, prev_title#0, curr_title#1, sum(n#3)#404L AS n#396L]

(10) TakeOrderedAndProject
Input [5]: [year#6, month#7, prev_title#0, curr_title#1, n#396L]
Arguments: 50, [n#396L DESC NULLS LAST], [year#6, month#7, prev_title#0, curr_title#1, n#396L]

(11) AdaptiveSparkPlan
Output [5]: [year#6, month#7, prev_title#0, curr_title#1, n#396L]
Arguments: isFinalPlan=false


Saved proof/plan_row.txt

2. Column representation: Parquet with partitioning and optional sort

col_base = "outputs/lab3/columnar"
 
# Write columnar
(row_df
 .write.mode("overwrite")
 .partitionBy("year","month")
 .parquet(f"{col_base}/clicks_parquet"))
 
# Re-read columnar (Spark infère le schéma depuis Parquet)
col_df = spark.read.parquet(f"{col_base}/clicks_parquet")
col_df.cache()
print("Columnar rows:", col_df.count())
Columnar rows: 15000

Evidence: column representation plan

q1_col = (col_df.filter(F.col("type")=="link")
           .groupBy("year","month","prev_title","curr_title")
           .agg(F.sum("n").alias("n"))
           .orderBy(F.desc("n"))
           .limit(50))
q1_col.explain("formatted")
with open("proof/plan_column.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(q1_col._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_column.txt")
 
== Physical Plan ==
AdaptiveSparkPlan (11)
+- TakeOrderedAndProject (10)
   +- HashAggregate (9)
      +- Exchange (8)
         +- HashAggregate (7)
            +- Project (6)
               +- Filter (5)
                  +- InMemoryTableScan (1)
                        +- InMemoryRelation (2)
                              +- * ColumnarToRow (4)
                                 +- Scan parquet  (3)


(1) InMemoryTableScan
Output [6]: [curr_title#793, month#798, n#795, prev_title#792, type#794, year#797]
Arguments: [curr_title#793, month#798, n#795, prev_title#792, type#794, year#797], [isnotnull(type#794), (type#794 = link)]

(2) InMemoryRelation
Arguments: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798]
Batched: true
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/lab3-practice/outputs/lab3/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798]

(5) Filter
Input [6]: [curr_title#793, month#798, n#795, prev_title#792, type#794, year#797]
Condition : (isnotnull(type#794) AND (type#794 = link))

(6) Project
Output [5]: [prev_title#792, curr_title#793, n#795, year#797, month#798]
Input [6]: [curr_title#793, month#798, n#795, prev_title#792, type#794, year#797]

(7) HashAggregate
Input [5]: [prev_title#792, curr_title#793, n#795, year#797, month#798]
Keys [4]: [year#797, month#798, prev_title#792, curr_title#793]
Functions [1]: [partial_sum(n#795)]
Aggregate Attributes [1]: [sum#1135L]
Results [5]: [year#797, month#798, prev_title#792, curr_title#793, sum#1136L]

(8) Exchange
Input [5]: [year#797, month#798, prev_title#792, curr_title#793, sum#1136L]
Arguments: hashpartitioning(year#797, month#798, prev_title#792, curr_title#793, 200), ENSURE_REQUIREMENTS, [plan_id=202]

(9) HashAggregate
Input [5]: [year#797, month#798, prev_title#792, curr_title#793, sum#1136L]
Keys [4]: [year#797, month#798, prev_title#792, curr_title#793]
Functions [1]: [sum(n#795)]
Aggregate Attributes [1]: [sum(n#795)#1029L]
Results [5]: [year#797, month#798, prev_title#792, curr_title#793, sum(n#795)#1029L AS n#1021L]

(10) TakeOrderedAndProject
Input [5]: [year#797, month#798, prev_title#792, curr_title#793, n#1021L]
Arguments: 50, [n#1021L DESC NULLS LAST], [year#797, month#798, prev_title#792, curr_title#793, n#1021L]

(11) AdaptiveSparkPlan
Output [5]: [year#797, month#798, prev_title#792, curr_title#793, n#1021L]
Arguments: isFinalPlan=false


Saved proof/plan_column.txt

3. Join strategy: normal vs broadcast

dim = spark.read.schema(dim_schema).option("header","true").csv("data/lab3_dim_curr_category.csv")
# Non‑broadcast join
j1 = (col_df.join(dim, "curr_title", "left")
      .groupBy("curr_category")
      .agg(F.sum("n").alias("total_n"))
      .orderBy(F.desc("total_n")))
j1.explain("formatted")
 
# Broadcast join
from pyspark.sql.functions import broadcast
j2 = (col_df.join(broadcast(dim), "curr_title", "left")
      .groupBy("curr_category")
      .agg(F.sum("n").alias("total_n"))
      .orderBy(F.desc("total_n")))
j2.explain("formatted")
 
# Save one plan for evidence
with open("proof/plan_broadcast.txt","w") as f:
    from datetime import datetime as _dt
    f.write(str(_dt.now())+"\n")
    f.write(j2._jdf.queryExecution().executedPlan().toString())
print("Saved proof/plan_broadcast.txt")
 
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
   +- Exchange (13)
      +- HashAggregate (12)
         +- Exchange (11)
            +- HashAggregate (10)
               +- Project (9)
                  +- BroadcastHashJoin LeftOuter BuildRight (8)
                     :- InMemoryTableScan (1)
                     :     +- InMemoryRelation (2)
                     :           +- * ColumnarToRow (4)
                     :              +- Scan parquet  (3)
                     +- BroadcastExchange (7)
                        +- Filter (6)
                           +- Scan csv  (5)


(1) InMemoryTableScan
Output [2]: [curr_title#793, n#795]
Arguments: [curr_title#793, n#795]

(2) InMemoryRelation
Arguments: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798]
Batched: true
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/lab3-practice/outputs/lab3/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798]

(5) Scan csv 
Output [2]: [curr_title#1137, curr_category#1138]
Batched: false
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/lab3-practice/data/lab3_dim_curr_category.csv]
PushedFilters: [IsNotNull(curr_title)]
ReadSchema: struct<curr_title:string,curr_category:string>

(6) Filter
Input [2]: [curr_title#1137, curr_category#1138]
Condition : isnotnull(curr_title#1137)

(7) BroadcastExchange
Input [2]: [curr_title#1137, curr_category#1138]
Arguments: HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=241]

(8) BroadcastHashJoin
Left keys [1]: [curr_title#793]
Right keys [1]: [curr_title#1137]
Join type: LeftOuter
Join condition: None

(9) Project
Output [2]: [n#795, curr_category#1138]
Input [4]: [curr_title#793, n#795, curr_title#1137, curr_category#1138]

(10) HashAggregate
Input [2]: [n#795, curr_category#1138]
Keys [1]: [curr_category#1138]
Functions [1]: [partial_sum(n#795)]
Aggregate Attributes [1]: [sum#1255L]
Results [2]: [curr_category#1138, sum#1256L]

(11) Exchange
Input [2]: [curr_category#1138, sum#1256L]
Arguments: hashpartitioning(curr_category#1138, 200), ENSURE_REQUIREMENTS, [plan_id=246]

(12) HashAggregate
Input [2]: [curr_category#1138, sum#1256L]
Keys [1]: [curr_category#1138]
Functions [1]: [sum(n#795)]
Aggregate Attributes [1]: [sum(n#795)#1149L]
Results [2]: [curr_category#1138, sum(n#795)#1149L AS total_n#1140L]

(13) Exchange
Input [2]: [curr_category#1138, total_n#1140L]
Arguments: rangepartitioning(total_n#1140L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=249]

(14) Sort
Input [2]: [curr_category#1138, total_n#1140L]
Arguments: [total_n#1140L DESC NULLS LAST], true, 0

(15) AdaptiveSparkPlan
Output [2]: [curr_category#1138, total_n#1140L]
Arguments: isFinalPlan=false


== Physical Plan ==
AdaptiveSparkPlan (15)
+- Sort (14)
   +- Exchange (13)
      +- HashAggregate (12)
         +- Exchange (11)
            +- HashAggregate (10)
               +- Project (9)
                  +- BroadcastHashJoin LeftOuter BuildRight (8)
                     :- InMemoryTableScan (1)
                     :     +- InMemoryRelation (2)
                     :           +- * ColumnarToRow (4)
                     :              +- Scan parquet  (3)
                     +- BroadcastExchange (7)
                        +- Filter (6)
                           +- Scan csv  (5)


(1) InMemoryTableScan
Output [2]: [curr_title#793, n#795]
Arguments: [curr_title#793, n#795]

(2) InMemoryRelation
Arguments: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798], StorageLevel(disk, memory, deserialized, 1 replicas)

(3) Scan parquet 
Output [7]: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798]
Batched: true
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/lab3-practice/outputs/lab3/columnar/clicks_parquet]
ReadSchema: struct<prev_title:string,curr_title:string,type:string,n:int,ts:timestamp>

(4) ColumnarToRow [codegen id : 1]
Input [7]: [prev_title#792, curr_title#793, type#794, n#795, ts#796, year#797, month#798]

(5) Scan csv 
Output [2]: [curr_title#1137, curr_category#1138]
Batched: false
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/lab3-practice/data/lab3_dim_curr_category.csv]
PushedFilters: [IsNotNull(curr_title)]
ReadSchema: struct<curr_title:string,curr_category:string>

(6) Filter
Input [2]: [curr_title#1137, curr_category#1138]
Condition : isnotnull(curr_title#1137)

(7) BroadcastExchange
Input [2]: [curr_title#1137, curr_category#1138]
Arguments: HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=287]

(8) BroadcastHashJoin
Left keys [1]: [curr_title#793]
Right keys [1]: [curr_title#1137]
Join type: LeftOuter
Join condition: None

(9) Project
Output [2]: [n#795, curr_category#1138]
Input [4]: [curr_title#793, n#795, curr_title#1137, curr_category#1138]

(10) HashAggregate
Input [2]: [n#795, curr_category#1138]
Keys [1]: [curr_category#1138]
Functions [1]: [partial_sum(n#795)]
Aggregate Attributes [1]: [sum#1372L]
Results [2]: [curr_category#1138, sum#1373L]

(11) Exchange
Input [2]: [curr_category#1138, sum#1373L]
Arguments: hashpartitioning(curr_category#1138, 200), ENSURE_REQUIREMENTS, [plan_id=292]

(12) HashAggregate
Input [2]: [curr_category#1138, sum#1373L]
Keys [1]: [curr_category#1138]
Functions [1]: [sum(n#795)]
Aggregate Attributes [1]: [sum(n#795)#1266L]
Results [2]: [curr_category#1138, sum(n#795)#1266L AS total_n#1257L]

(13) Exchange
Input [2]: [curr_category#1138, total_n#1257L]
Arguments: rangepartitioning(total_n#1257L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=295]

(14) Sort
Input [2]: [curr_category#1138, total_n#1257L]
Arguments: [total_n#1257L DESC NULLS LAST], true, 0

(15) AdaptiveSparkPlan
Output [2]: [curr_category#1138, total_n#1257L]
Arguments: isFinalPlan=false


Saved proof/plan_broadcast.txt

4. Additional queries for metrics

# Q2: daily GMV‑like metric (sum of n) for a specific title window
q2_row = (row_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
           .groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))
q2_col = (col_df.filter((F.col("type")=="link") & F.col("curr_title").isin("Apache_Spark","PySpark"))
           .groupBy("year","month","curr_title").agg(F.sum("n").alias("n")).orderBy("year","month","curr_title"))
 
# Trigger
_ = q2_row.count(); _ = q2_col.count()
 
# Q3: heavy cardinality grouping
q3_row = row_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
q3_col = col_df.groupBy("prev_title","curr_title").agg(F.sum("n").alias("n")).orderBy(F.desc("n")).limit(100)
_ = q3_row.count(); _ = q3_col.count()
 
print("Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv")
 
Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab3_metrics_log.csv

5. Save sample outputs

import pathlib, pandas as pd
pathlib.Path("outputs/lab3").mkdir(parents=True, exist_ok=True)
q1_row.limit(10).toPandas().to_csv("outputs/lab3/q1_row_top10.csv", index=False)
q1_col.limit(10).toPandas().to_csv("outputs/lab3/q1_col_top10.csv", index=False)
j2.limit(20).toPandas().to_csv("outputs/lab3/j2_broadcast_sample.csv", index=False)
print("Saved sample outputs in outputs/lab3/")
 
Saved sample outputs in outputs/lab3/

6. Cleanup

spark.stop()
print("Spark session stopped.")
 
Spark session stopped.