DE1 — Lab 0: Installation and Sanity Checks¶
Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
Goal: prove your local setup using metrics and execution plans.
0. Preamble¶
- Activate the
de1-envenvironment. - Verify Java 21 and Spark 4.
In [1]:
import sys, os, subprocess, json, datetime, platform
print("Python:", sys.version)
print("Platform:", platform.platform())
# Java version
try:
out = subprocess.check_output(["java","-version"], stderr=subprocess.STDOUT).decode()
print(out.splitlines()[0])
except Exception as e:
print("java -version failed:", e)
Python: 3.10.18 (main, Jun 5 2025, 13:14:17) [GCC 11.2.0] Platform: Linux-6.11.0-19-generic-x86_64-with-glibc2.39 openjdk version "11.0.1" 2018-10-16 LTS
1. Verify PySpark¶
In [4]:
import findspark, pyspark
from pyspark.sql import SparkSession
findspark.init()
print("PySpark:", pyspark.__version__)
spark = SparkSession.builder.appName("de1-lab0").getOrCreate()
print("Spark:", spark.version)
PySpark: 4.0.1 Spark: 4.0.1
2. Generate a tiny local CSV and read it with Spark¶
In [5]:
import os, csv, pathlib
pathlib.Path("data").mkdir(exist_ok=True)
rows = [
{"user_id":1,"product_id":101,"price":9.9,"ts":"2025-09-01T09:00:00"},
{"user_id":1,"product_id":102,"price":19.0,"ts":"2025-09-01T09:02:00"},
{"user_id":2,"product_id":101,"price":9.9,"ts":"2025-09-02T10:00:00"},
{"user_id":3,"product_id":103,"price":5.5,"ts":"2025-09-03T11:30:00"},
]
with open("data/sample_sales.csv","w",newline="") as f:
w = csv.DictWriter(f, fieldnames=["user_id","product_id","price","ts"])
w.writeheader(); w.writerows(rows)
print("Wrote data/sample_sales.csv, bytes:", os.path.getsize("data/sample_sales.csv"))
Wrote data/sample_sales.csv, bytes: 154
In [6]:
df = spark.read.option("header","true").option("inferSchema","true").csv("data/sample_sales.csv")
df.printSchema()
df.show()
root |-- user_id: integer (nullable = true) |-- product_id: integer (nullable = true) |-- price: double (nullable = true) |-- ts: timestamp (nullable = true) +-------+----------+-----+-------------------+ |user_id|product_id|price| ts| +-------+----------+-----+-------------------+ | 1| 101| 9.9|2025-09-01 09:00:00| | 1| 102| 19.0|2025-09-01 09:02:00| | 2| 101| 9.9|2025-09-02 10:00:00| | 3| 103| 5.5|2025-09-03 11:30:00| +-------+----------+-----+-------------------+
3. Execution plan — evidence¶
In [7]:
from pyspark.sql import functions as F
agg = df.groupBy("user_id").agg(F.count("*").alias("n"), F.sum("price").alias("total"))
agg.show()
plan = agg._jdf.queryExecution().executedPlan().toString()
print(plan)
print("\n=== explain formatted ===")
agg.explain("formatted")
# Save evidence
import pathlib, datetime
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_formatted.txt","w") as f:
f.write(str(datetime.datetime.now()) + "\n\n")
f.write(str(plan))
print("Saved proof/plan_formatted.txt")
+-------+---+-----+
|user_id| n|total|
+-------+---+-----+
| 1| 2| 28.9|
| 3| 1| 5.5|
| 2| 1| 9.9|
+-------+---+-----+
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[user_id#17], functions=[count(1), sum(price#19)], output=[user_id#17, n#39L, total#40])
+- Exchange hashpartitioning(user_id#17, 200), ENSURE_REQUIREMENTS, [plan_id=84]
+- HashAggregate(keys=[user_id#17], functions=[partial_count(1), partial_sum(price#19)], output=[user_id#17, count#52L, sum#53])
+- FileScan csv [user_id#17,price#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/sable/Documents/data engineering1/data/sample_sales.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:int,price:double>
=== explain formatted ===
== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
+- Exchange (3)
+- HashAggregate (2)
+- Scan csv (1)
(1) Scan csv
Output [2]: [user_id#17, price#19]
Batched: false
Location: InMemoryFileIndex [file:/home/sable/Documents/data engineering1/data/sample_sales.csv]
ReadSchema: struct<user_id:int,price:double>
(2) HashAggregate
Input [2]: [user_id#17, price#19]
Keys [1]: [user_id#17]
Functions [2]: [partial_count(1), partial_sum(price#19)]
Aggregate Attributes [2]: [count#50L, sum#51]
Results [3]: [user_id#17, count#52L, sum#53]
(3) Exchange
Input [3]: [user_id#17, count#52L, sum#53]
Arguments: hashpartitioning(user_id#17, 200), ENSURE_REQUIREMENTS, [plan_id=84]
(4) HashAggregate
Input [3]: [user_id#17, count#52L, sum#53]
Keys [1]: [user_id#17]
Functions [2]: [count(1), sum(price#19)]
Aggregate Attributes [2]: [count(1)#45L, sum(price#19)#46]
Results [3]: [user_id#17, count(1)#45L AS n#39L, sum(price#19)#46 AS total#40]
(5) AdaptiveSparkPlan
Output [3]: [user_id#17, n#39L, total#40]
Arguments: isFinalPlan=false
Saved proof/plan_formatted.txt
4. Spark UI — metrics to log¶
Open http://localhost:4040 while a job is running and record:
- Files Read
- Input Size
- Shuffle Read
- Shuffle Write
Fill the provided metrics_log_template_en.csv.
5. Cleanup¶
In [10]:
spark.stop()
print("Spark session stopped.")
Spark session stopped.