DE1 — Final Project Notebook¶

Professor : Badr TAJINI - Data Engineering I - ESIEE 2025-2026


Students : DIALLO Samba & DIOP Mouhamed


This is the primary executable artifact. Fill config, run baseline, then optimized pipeline, and record evidence.

0. Load config¶

In [1]:
import yaml, pathlib, datetime
from pyspark.sql import SparkSession, functions as F, types as T

with open("de1_project_config.yml") as f:
    CFG = yaml.safe_load(f)

spark = (SparkSession.builder
    .appName("de1-lakehouse-project")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.memory.fraction", "0.6")
    .config("spark.memory.storageFraction", "0.3")
    .getOrCreate())

print(f"Spark version: {spark.version}")
print(f"Project: {CFG['project']['name']}")
CFG
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/20 20:51:51 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 10.192.33.105 instead (on interface wlp2s0)
25/12/20 20:51:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/20 20:51:51 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 10.192.33.105 instead (on interface wlp2s0)
25/12/20 20:51:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/20 20:51:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/20 20:51:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Project: DE1 Local Lakehouse Project
Out[1]:
{'project': {'name': 'DE1 Local Lakehouse Project',
  'dataset': 'Wikipedia Clickstream (Nov 2024)',
  'dataset_size': '10M rows / 450MB',
  'team': 'Badr TAJINI'},
 'paths': {'raw_csv_glob': 'data/clickstream/clickstream-10M.tsv',
  'bronze': 'outputs/project/bronze',
  'silver': 'outputs/project/silver',
  'gold': 'outputs/project/gold',
  'proof': 'proof',
  'metrics_log': 'project_metrics_log.csv'},
 'slo': {'freshness_hours': 2,
  'q1_latency_p95_seconds': 4,
  'storage_ratio_max': 0.6},
 'hardware': {'cpu': 'Intel Core i7',
  'ram_gb': 16,
  'disk_type': 'SSD',
  'spark_version': '4.0.0'},
 'layout': {'partition_by': [],
  'sort_by': [],
  'target_file_size_mb': 128,
  'max_files_per_partition': 10},
 'queries': {'q1': {'description': 'Top 20 most visited pages',
   'sql': 'SELECT curr as page, \n       SUM(n) as total_clicks\nFROM silver\nGROUP BY curr\nORDER BY total_clicks DESC\nLIMIT 20\n'},
  'q2': {'description': 'Top 20 referrer pages',
   'sql': "SELECT prev as referrer, \n       SUM(n) as total_clicks\nFROM silver\nWHERE prev != 'other-empty'\nGROUP BY prev\nORDER BY total_clicks DESC\nLIMIT 20\n"},
  'q3': {'description': 'Click patterns by type (external/internal/link)',
   'sql': 'SELECT type,\n       COUNT(DISTINCT curr) as unique_pages,\n       SUM(n) as total_clicks,\n       AVG(n) as avg_clicks_per_pair\nFROM silver\nGROUP BY type\nORDER BY total_clicks DESC\n'}},
 'silver_schema': {'columns': [{'name': 'prev',
    'type': 'StringType',
    'nullable': False},
   {'name': 'curr', 'type': 'StringType', 'nullable': False},
   {'name': 'type', 'type': 'StringType', 'nullable': False},
   {'name': 'n', 'type': 'IntegerType', 'nullable': False}]},
 'data_quality': {'rules': [{'name': 'Non-null clicks',
    'check': 'n IS NOT NULL',
    'severity': 'error'},
   {'name': 'Positive clicks', 'check': 'n >= 0', 'severity': 'error'},
   {'name': 'Valid page names',
    'check': 'LENGTH(curr) > 0',
    'severity': 'error'},
   {'name': 'Valid type',
    'check': "type IN ('link', 'external', 'other')",
    'severity': 'warning'}]},
 'optimization': {'baseline': ['No partitioning',
   'No sorting',
   'Default file sizes',
   'Full table scans'],
  'optimized': ['Repartition by computed hash',
   'Sort by clicks (n) descending',
   'Target 128MB file size',
   'Enable AQE',
   'Coalesce small files']}}

1. Bronze — landing raw data¶

In [2]:
raw_glob = CFG["paths"]["raw_csv_glob"]
bronze = CFG["paths"]["bronze"]
proof = CFG["paths"]["proof"]

df_raw = (spark.read
    .option("header", "false")
    .option("inferSchema", "false")
    .option("delimiter", "\t")
    .csv(raw_glob)
    .toDF("prev", "curr", "type", "n"))

row_count = df_raw.count()
df_raw.write.mode("overwrite").csv(bronze)
print(f"Bronze written: {bronze}, rows: {row_count:,}")
[Stage 4:============================================>              (6 + 2) / 8]
Bronze written: outputs/project/bronze, rows: 10,000,000
                                                                                

2. Silver — cleaning and typing¶

In [3]:
silver = CFG["paths"]["silver"]

df_silver = (df_raw
    .withColumn("n", F.col("n").cast("integer"))
    .filter(F.col("n").isNotNull())
    .filter(F.col("n") >= 0)
    .filter(F.length(F.col("curr")) > 0)
    .dropDuplicates())

silver_count = df_silver.count()
df_silver.write.mode("overwrite").parquet(silver)
print(f"Silver written: {silver}, rows: {silver_count:,}")
[Stage 13:===================================================>      (8 + 1) / 9]
Silver written: outputs/project/silver, rows: 10,000,000
                                                                                

3. Gold — analytics tables¶

In [4]:
gold = CFG["paths"]["gold"]
queries = CFG["queries"]

pathlib.Path(gold).mkdir(parents=True, exist_ok=True)
df_silver.createOrReplaceTempView("silver")

df_q1 = spark.sql(queries["q1"]["sql"])
q1_count = df_q1.count()
df_q1.write.mode("overwrite").parquet(f"{gold}/q1_daily_aggregation")
print(f"Q1 written, rows: {q1_count:,}")

df_q2 = spark.sql(queries["q2"]["sql"])
q2_count = df_q2.count()
df_q2.write.mode("overwrite").parquet(f"{gold}/q2_top_referrers")
print(f"Q2 written, rows: {q2_count:,}")

df_q3 = spark.sql(queries["q3"]["sql"])
q3_count = df_q3.count()
df_q3.write.mode("overwrite").parquet(f"{gold}/q3_filtered_analysis")
print(f"Q3 written, rows: {q3_count:,}")

print(f"Gold written: {gold}")
                                                                                
Q1 written, rows: 20
                                                                                
Q2 written, rows: 20
                                                                                
Q3 written, rows: 3
Gold written: outputs/project/gold

4. Baseline plans and metrics¶

In [5]:
pathlib.Path(proof).mkdir(parents=True, exist_ok=True)

df_q1_baseline = spark.sql(queries["q1"]["sql"])
plan_q1 = df_q1_baseline._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/baseline_q1_plan.txt", "w") as f:
    f.write(str(datetime.datetime.now()) + "\n")
    f.write(plan_q1)

df_q2_baseline = spark.sql(queries["q2"]["sql"])
plan_q2 = df_q2_baseline._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/baseline_q2_plan.txt", "w") as f:
    f.write(str(datetime.datetime.now()) + "\n")
    f.write(plan_q2)

df_q3_baseline = spark.sql(queries["q3"]["sql"])
plan_q3 = df_q3_baseline._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/baseline_q3_plan.txt", "w") as f:
    f.write(str(datetime.datetime.now()) + "\n")
    f.write(plan_q3)

print("Saved baseline plans. Record Spark UI metrics now.")
Saved baseline plans. Record Spark UI metrics now.

5. Optimization — layout and joins¶

In [6]:
layout = CFG["layout"]
target_size_mb = layout["target_file_size_mb"]

df_silver_reloaded = spark.read.parquet(silver)
total_bytes = df_silver_reloaded.count() * 100
num_partitions = max(4, int(total_bytes / (target_size_mb * 1024 * 1024)))

df_silver_opt = (df_silver_reloaded
    .repartition(num_partitions)
    .sortWithinPartitions(F.desc("n")))

silver_opt = f"{silver}_optimized"
df_silver_opt.write.mode("overwrite").parquet(silver_opt)
print(f"Optimized silver written: {silver_opt}")

df_silver_opt.createOrReplaceTempView("silver")

df_q1_opt = spark.sql(queries["q1"]["sql"])
plan_q1_opt = df_q1_opt._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/optimized_q1_plan.txt", "w") as f:
    f.write(str(datetime.datetime.now()) + "\n")
    f.write(f"Optimizations: {num_partitions} partitions, sorted by n desc\n")
    f.write(plan_q1_opt)

df_q2_opt = spark.sql(queries["q2"]["sql"])
plan_q2_opt = df_q2_opt._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/optimized_q2_plan.txt", "w") as f:
    f.write(str(datetime.datetime.now()) + "\n")
    f.write(f"Optimizations: {num_partitions} partitions, sorted by n desc\n")
    f.write(plan_q2_opt)

df_q3_opt = spark.sql(queries["q3"]["sql"])
plan_q3_opt = df_q3_opt._jdf.queryExecution().executedPlan().toString()
with open(f"{proof}/optimized_q3_plan.txt", "w") as f:
    f.write(str(datetime.datetime.now()) + "\n")
    f.write(f"Optimizations: {num_partitions} partitions, sorted by n desc\n")
    f.write(plan_q3_opt)

print("Saved optimized plans. Record Spark UI metrics now.")
[Stage 77:>                                                         (0 + 7) / 7]
Optimized silver written: outputs/project/silver_optimized
Saved optimized plans. Record Spark UI metrics now.
                                                                                

6. Cleanup¶

In [7]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.