DE2 — Lab 0: Environment Validation & Plan Reading¶

Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026

Goal: Validate your Spark environment and refresh plan-reading skills from DE1.

Tasks:

  1. Create a Spark session and verify the version.
  2. Read the sample CSV, enforce schema, write partitioned Parquet.
  3. Read the Parquet back and compare plans with explain("formatted").
  4. Capture Spark UI metrics (Shuffle Read/Write, Input Size).
  5. Export this notebook as a Python script and run via spark-submit.
In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

spark = SparkSession.builder \
    .appName("DE2-Lab0-Validation") \
    .master("local[*]") \
    .getOrCreate()

print("Spark version:", spark.version)
print("Spark UI:", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/04/27 18:44:20 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 192.168.221.2 instead (on interface wlp2s0)
26/04/27 18:44:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/27 18:44:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark version: 4.0.1
Spark UI: http://192.168.221.2:4040

1. Read CSV with explicit schema¶

In [2]:
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("category", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("text", StringType(), True),
])

df = spark.read.csv("data/sample.csv", header=True, schema=schema)
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.printSchema()
df.show(5)
Rows: 15, Columns: 4
root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- value: double (nullable = true)
 |-- text: string (nullable = true)

+---+--------+-----+--------------------+
| id|category|value|                text|
+---+--------+-----+--------------------+
|  1|    tech| 42.5|distributed syste...|
|  2| science| 88.3|machine learning ...|
|  3|    tech| 15.7|spark shuffle ope...|
|  4|business| 67.2|data warehouses s...|
|  5| science| 93.1|clustering algori...|
+---+--------+-----+--------------------+
only showing top 5 rows

2. Write partitioned Parquet¶

In [3]:
df.write.mode("overwrite") \
    .partitionBy("category") \
    .parquet("outputs/lab0/sample_parquet")

print("Parquet written to outputs/lab0/sample_parquet/")
[Stage 4:>                                                          (0 + 1) / 1]
Parquet written to outputs/lab0/sample_parquet/
                                                                                

3. Read Parquet and compare plans¶

In [4]:
df_parquet = spark.read.parquet("outputs/lab0/sample_parquet")

print("=== CSV scan plan ===")
df.explain("formatted")

print("\n=== Parquet scan plan ===")
df_parquet.explain("formatted")
=== CSV scan plan ===
== Physical Plan ==
Scan csv  (1)


(1) Scan csv 
Output [4]: [id#0, category#1, value#2, text#3]
Batched: false
Location: InMemoryFileIndex [file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab0 setup practice/data/sample.csv]
ReadSchema: struct<id:int,category:string,value:double,text:string>



=== Parquet scan plan ===
== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet  (1)


(1) Scan parquet 
Output [4]: [id#35, value#36, text#37, category#38]
Batched: true
Location: InMemoryFileIndex [file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab0 setup practice/outputs/lab0/sample_parquet]
ReadSchema: struct<id:int,value:double,text:string>

(2) ColumnarToRow [codegen id : 1]
Input [4]: [id#35, value#36, text#37, category#38]


4. Simple aggregation — capture metrics¶

In [5]:
from pyspark.sql import functions as F

agg_df = df_parquet.groupBy("category").agg(
    F.count("*").alias("cnt"),
    F.avg("value").alias("avg_value")
)
agg_df.explain("formatted")
agg_df.show()
== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
   +- Exchange (3)
      +- HashAggregate (2)
         +- Scan parquet  (1)


(1) Scan parquet 
Output [2]: [value#36, category#38]
Batched: true
Location: InMemoryFileIndex [file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab0 setup practice/outputs/lab0/sample_parquet]
ReadSchema: struct<value:double>

(2) HashAggregate
Input [2]: [value#36, category#38]
Keys [1]: [category#38]
Functions [2]: [partial_count(1), partial_avg(value#36)]
Aggregate Attributes [3]: [count#48L, sum#49, count#50L]
Results [4]: [category#38, count#51L, sum#52, count#53L]

(3) Exchange
Input [4]: [category#38, count#51L, sum#52, count#53L]
Arguments: hashpartitioning(category#38, 200), ENSURE_REQUIREMENTS, [plan_id=88]

(4) HashAggregate
Input [4]: [category#38, count#51L, sum#52, count#53L]
Keys [1]: [category#38]
Functions [2]: [count(1), avg(value#36)]
Aggregate Attributes [2]: [count(1)#45L, avg(value#36)#46]
Results [3]: [category#38, count(1)#45L AS cnt#39L, avg(value#36)#46 AS avg_value#40]

(5) AdaptiveSparkPlan
Output [3]: [category#38, cnt#39L, avg_value#40]
Arguments: isFinalPlan=false


+--------+---+------------------+
|category|cnt|         avg_value|
+--------+---+------------------+
|    tech|  6|41.733333333333334|
| science|  5| 75.55999999999999|
|business|  4|            59.675|
+--------+---+------------------+

5. Capture evidence¶

  1. Open http://localhost:4040 and take screenshots of SQL and Jobs tabs.
  2. Note the Shuffle Read/Write and Input Size values.
  3. Save screenshots in the proof/ folder.
In [6]:
spark.stop()
print("Lab 0 complete. Environment validated.")
Lab 0 complete. Environment validated.