DE1 — Lab 0: Installation and Sanity Checks¶

Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026


Goal: prove your local setup using metrics and execution plans.

0. Preamble¶

  • Activate the de1-env environment.
  • Verify Java 21 and Spark 4.
In [1]:
import sys, os, subprocess, json, datetime, platform
print("Python:", sys.version)
print("Platform:", platform.platform())
# Java version
try:
    out = subprocess.check_output(["java","-version"], stderr=subprocess.STDOUT).decode()
    print(out.splitlines()[0])
except Exception as e:
    print("java -version failed:", e)
Python: 3.10.18 (main, Jun  5 2025, 13:14:17) [GCC 11.2.0]
Platform: Linux-6.11.0-19-generic-x86_64-with-glibc2.39
openjdk version "11.0.1" 2018-10-16 LTS

1. Verify PySpark¶

In [4]:
import findspark, pyspark
from pyspark.sql import SparkSession
findspark.init()
print("PySpark:", pyspark.__version__)
spark = SparkSession.builder.appName("de1-lab0").getOrCreate()
print("Spark:", spark.version)
PySpark: 4.0.1
Spark: 4.0.1

2. Generate a tiny local CSV and read it with Spark¶

In [5]:
import os, csv, pathlib
pathlib.Path("data").mkdir(exist_ok=True)
rows = [
    {"user_id":1,"product_id":101,"price":9.9,"ts":"2025-09-01T09:00:00"},
    {"user_id":1,"product_id":102,"price":19.0,"ts":"2025-09-01T09:02:00"},
    {"user_id":2,"product_id":101,"price":9.9,"ts":"2025-09-02T10:00:00"},
    {"user_id":3,"product_id":103,"price":5.5,"ts":"2025-09-03T11:30:00"},
]
with open("data/sample_sales.csv","w",newline="") as f:
    w = csv.DictWriter(f, fieldnames=["user_id","product_id","price","ts"])
    w.writeheader(); w.writerows(rows)
print("Wrote data/sample_sales.csv, bytes:", os.path.getsize("data/sample_sales.csv"))
Wrote data/sample_sales.csv, bytes: 154
In [6]:
df = spark.read.option("header","true").option("inferSchema","true").csv("data/sample_sales.csv")
df.printSchema()
df.show()
                                                                                
root
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- ts: timestamp (nullable = true)

+-------+----------+-----+-------------------+
|user_id|product_id|price|                 ts|
+-------+----------+-----+-------------------+
|      1|       101|  9.9|2025-09-01 09:00:00|
|      1|       102| 19.0|2025-09-01 09:02:00|
|      2|       101|  9.9|2025-09-02 10:00:00|
|      3|       103|  5.5|2025-09-03 11:30:00|
+-------+----------+-----+-------------------+

3. Execution plan — evidence¶

In [7]:
from pyspark.sql import functions as F
agg = df.groupBy("user_id").agg(F.count("*").alias("n"), F.sum("price").alias("total"))
agg.show()
plan = agg._jdf.queryExecution().executedPlan().toString()
print(plan)
print("\n=== explain formatted ===")
agg.explain("formatted")

# Save evidence
import pathlib, datetime
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_formatted.txt","w") as f:
    f.write(str(datetime.datetime.now()) + "\n\n")
    f.write(str(plan))
print("Saved proof/plan_formatted.txt")
+-------+---+-----+
|user_id|  n|total|
+-------+---+-----+
|      1|  2| 28.9|
|      3|  1|  5.5|
|      2|  1|  9.9|
+-------+---+-----+

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[user_id#17], functions=[count(1), sum(price#19)], output=[user_id#17, n#39L, total#40])
   +- Exchange hashpartitioning(user_id#17, 200), ENSURE_REQUIREMENTS, [plan_id=84]
      +- HashAggregate(keys=[user_id#17], functions=[partial_count(1), partial_sum(price#19)], output=[user_id#17, count#52L, sum#53])
         +- FileScan csv [user_id#17,price#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/sable/Documents/data engineering1/data/sample_sales.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:int,price:double>


=== explain formatted ===
== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
   +- Exchange (3)
      +- HashAggregate (2)
         +- Scan csv  (1)


(1) Scan csv 
Output [2]: [user_id#17, price#19]
Batched: false
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/data/sample_sales.csv]
ReadSchema: struct<user_id:int,price:double>

(2) HashAggregate
Input [2]: [user_id#17, price#19]
Keys [1]: [user_id#17]
Functions [2]: [partial_count(1), partial_sum(price#19)]
Aggregate Attributes [2]: [count#50L, sum#51]
Results [3]: [user_id#17, count#52L, sum#53]

(3) Exchange
Input [3]: [user_id#17, count#52L, sum#53]
Arguments: hashpartitioning(user_id#17, 200), ENSURE_REQUIREMENTS, [plan_id=84]

(4) HashAggregate
Input [3]: [user_id#17, count#52L, sum#53]
Keys [1]: [user_id#17]
Functions [2]: [count(1), sum(price#19)]
Aggregate Attributes [2]: [count(1)#45L, sum(price#19)#46]
Results [3]: [user_id#17, count(1)#45L AS n#39L, sum(price#19)#46 AS total#40]

(5) AdaptiveSparkPlan
Output [3]: [user_id#17, n#39L, total#40]
Arguments: isFinalPlan=false


Saved proof/plan_formatted.txt

4. Spark UI — metrics to log¶

Open http://localhost:4040 while a job is running and record:

  • Files Read
  • Input Size
  • Shuffle Read
  • Shuffle Write

Fill the provided metrics_log_template_en.csv.

5. Cleanup¶

In [10]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.