DE2 — Lab 0: Environment Validation & Plan Reading¶
Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Goal: Validate your Spark environment and refresh plan-reading skills from DE1.
Tasks:
- Create a Spark session and verify the version.
- Read the sample CSV, enforce schema, write partitioned Parquet.
- Read the Parquet back and compare plans with
explain("formatted"). - Capture Spark UI metrics (Shuffle Read/Write, Input Size).
- Export this notebook as a Python script and run via
spark-submit.
In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
spark = SparkSession.builder \
.appName("DE2-Lab0-Validation") \
.master("local[*]") \
.getOrCreate()
print("Spark version:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/04/27 18:44:20 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 192.168.221.2 instead (on interface wlp2s0) 26/04/27 18:44:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/27 18:44:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1 Spark UI: http://192.168.221.2:4040
1. Read CSV with explicit schema¶
In [2]:
schema = StructType([
StructField("id", IntegerType(), False),
StructField("category", StringType(), True),
StructField("value", DoubleType(), True),
StructField("text", StringType(), True),
])
df = spark.read.csv("data/sample.csv", header=True, schema=schema)
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.printSchema()
df.show(5)
Rows: 15, Columns: 4 root |-- id: integer (nullable = true) |-- category: string (nullable = true) |-- value: double (nullable = true) |-- text: string (nullable = true) +---+--------+-----+--------------------+ | id|category|value| text| +---+--------+-----+--------------------+ | 1| tech| 42.5|distributed syste...| | 2| science| 88.3|machine learning ...| | 3| tech| 15.7|spark shuffle ope...| | 4|business| 67.2|data warehouses s...| | 5| science| 93.1|clustering algori...| +---+--------+-----+--------------------+ only showing top 5 rows
2. Write partitioned Parquet¶
In [3]:
df.write.mode("overwrite") \
.partitionBy("category") \
.parquet("outputs/lab0/sample_parquet")
print("Parquet written to outputs/lab0/sample_parquet/")
[Stage 4:> (0 + 1) / 1]
Parquet written to outputs/lab0/sample_parquet/
3. Read Parquet and compare plans¶
In [4]:
df_parquet = spark.read.parquet("outputs/lab0/sample_parquet")
print("=== CSV scan plan ===")
df.explain("formatted")
print("\n=== Parquet scan plan ===")
df_parquet.explain("formatted")
=== CSV scan plan === == Physical Plan == Scan csv (1) (1) Scan csv Output [4]: [id#0, category#1, value#2, text#3] Batched: false Location: InMemoryFileIndex [file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab0 setup practice/data/sample.csv] ReadSchema: struct<id:int,category:string,value:double,text:string> === Parquet scan plan === == Physical Plan == * ColumnarToRow (2) +- Scan parquet (1) (1) Scan parquet Output [4]: [id#35, value#36, text#37, category#38] Batched: true Location: InMemoryFileIndex [file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab0 setup practice/outputs/lab0/sample_parquet] ReadSchema: struct<id:int,value:double,text:string> (2) ColumnarToRow [codegen id : 1] Input [4]: [id#35, value#36, text#37, category#38]
4. Simple aggregation — capture metrics¶
In [5]:
from pyspark.sql import functions as F
agg_df = df_parquet.groupBy("category").agg(
F.count("*").alias("cnt"),
F.avg("value").alias("avg_value")
)
agg_df.explain("formatted")
agg_df.show()
== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
+- Exchange (3)
+- HashAggregate (2)
+- Scan parquet (1)
(1) Scan parquet
Output [2]: [value#36, category#38]
Batched: true
Location: InMemoryFileIndex [file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab0 setup practice/outputs/lab0/sample_parquet]
ReadSchema: struct<value:double>
(2) HashAggregate
Input [2]: [value#36, category#38]
Keys [1]: [category#38]
Functions [2]: [partial_count(1), partial_avg(value#36)]
Aggregate Attributes [3]: [count#48L, sum#49, count#50L]
Results [4]: [category#38, count#51L, sum#52, count#53L]
(3) Exchange
Input [4]: [category#38, count#51L, sum#52, count#53L]
Arguments: hashpartitioning(category#38, 200), ENSURE_REQUIREMENTS, [plan_id=88]
(4) HashAggregate
Input [4]: [category#38, count#51L, sum#52, count#53L]
Keys [1]: [category#38]
Functions [2]: [count(1), avg(value#36)]
Aggregate Attributes [2]: [count(1)#45L, avg(value#36)#46]
Results [3]: [category#38, count(1)#45L AS cnt#39L, avg(value#36)#46 AS avg_value#40]
(5) AdaptiveSparkPlan
Output [3]: [category#38, cnt#39L, avg_value#40]
Arguments: isFinalPlan=false
+--------+---+------------------+
|category|cnt| avg_value|
+--------+---+------------------+
| tech| 6|41.733333333333334|
| science| 5| 75.55999999999999|
|business| 4| 59.675|
+--------+---+------------------+
5. Capture evidence¶
- Open http://localhost:4040 and take screenshots of SQL and Jobs tabs.
- Note the Shuffle Read/Write and Input Size values.
- Save screenshots in the
proof/folder.
In [6]:
spark.stop()
print("Lab 0 complete. Environment validated.")
Lab 0 complete. Environment validated.