DE2 — Assignment 3 : Graphes ou Clustering¶
Auteur : Badr TAJINI - Data Engineering II (Workloads Intensifs en Données) - ESIEE 2025-2026
Piste : (Saisir votre piste : A/B/C/D)
Chemin choisi : Clustering (PySpark MLlib — KMeans)
Noms : (Étudiant 1 — Étudiant 2)
Ce notebook met en œuvre un workload itératif instrumenté sur le chemin Clustering. Nous nous concentrons sur :
- la construction des vecteurs de features,
- l'exécution itérative de KMeans avec métriques par itération,
- la comparaison de stratégies de partitionnement (avant/après
repartition) et l'analyse des coûts de shuffle, - le suivi de la convergence via le score de silhouette,
- la production des preuves (plans, captures Spark UI, journal de métriques) et d'un rapport comparatif avant/après.
Référez-vous à helper_assignment3-de2_esiee.md et au document Lab 3 Overview pour les attendus détaillés.
0. Configuration initiale et préparation¶
Cette première cellule initialise la session Spark, configure les paramètres réseau locaux, ajuste le nombre de partitions de shuffle (spark.sql.shuffle.partitions) à une valeur modeste pour des expérimentations sur poste local, et crée les répertoires de sortie nécessaires (outputs/lab3/, proof/). Nous affichons également l'URL de l'interface Spark UI pour pouvoir capturer les preuves visuelles demandées.
# === Importations et configuration ===
import os
import io
import sys
import csv
import time
import shutil
import pathlib
import statistics
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Variables d'environnement pour la communication Spark en local
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)
def show_spark_ui(spark_session):
"""Affiche la version de Spark et l'URL de l'interface Spark UI."""
ui_url = spark_session.sparkContext.uiWebUrl
print("Version Spark :", spark_session.version)
if ui_url:
ui_port = urlparse(ui_url).port or 4040
print("Interface Spark :", ui_url)
print("Interface Spark (navigateur WSL/Windows) :", f"http://localhost:{ui_port}")
else:
print("Interface Spark : non disponible")
# Initialisation de la session Spark
# On force un nombre raisonnable de partitions de shuffle pour pouvoir comparer les stratégies
spark = SparkSession.builder \
.appName("de2-assignment3") \
.master("local[*]") \
.config("spark.driver.host", DE2_SPARK_DRIVER_HOST) \
.config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS) \
.config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS) \
.config("spark.sql.shuffle.partitions", "8") \
.getOrCreate()
show_spark_ui(spark)
# Création des répertoires de sortie
output_dir = pathlib.Path("outputs/lab3")
output_dir.mkdir(parents=True, exist_ok=True)
clusters_dir = output_dir / "cluster_assignments"
proof_dir = pathlib.Path("proof")
proof_dir.mkdir(parents=True, exist_ok=True)
# On nettoie les anciennes sorties pour repartir d'un état propre
if clusters_dir.exists():
shutil.rmtree(clusters_dir)
clusters_dir.mkdir(parents=True, exist_ok=True)
print("Répertoires de sortie créés avec succès.")
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/29 20:12:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Version Spark : 4.0.0 Interface Spark : http://127.0.0.1:4040 Interface Spark (navigateur WSL/Windows) : http://localhost:4040 Répertoires de sortie créés avec succès.
1. Étape 1 — Données et features¶
Nous construisons un jeu de données synthétique en deux dimensions composé de plusieurs blobs (gaussiennes isotropes) afin d'avoir une structure de clusters connue à l'avance. Cela permet de valider visuellement et numériquement la qualité du clustering. Les étapes :
- génération aléatoire de points autour de centres bien séparés,
- assemblage des colonnes numériques en un vecteur de features avec
VectorAssembler, - standardisation des features avec
StandardScalerpour éviter les dérives liées aux échelles (recommandation du helper : normalize before VectorAssembler/clustering).
Le DataFrame final df_features sera réutilisé dans toutes les étapes suivantes.
# === Génération du jeu de données et préparation des vecteurs de features ===
import random
# Centres connus des clusters (vérité terrain) et nombre de points par cluster
true_centers = [(0.0, 0.0), (10.0, 10.0), (0.0, 10.0), (10.0, 0.0), (5.0, 5.0)]
points_per_cluster = 400
noise_std = 0.7
random.seed(42)
# Génération des points : pour chaque centre, on tire des points avec un bruit gaussien
raw_points = []
for cluster_id, (cx, cy) in enumerate(true_centers):
for _ in range(points_per_cluster):
x = random.gauss(cx, noise_std)
y = random.gauss(cy, noise_std)
raw_points.append((x, y, cluster_id))
# Schéma explicite pour bénéficier des optimisations Catalyst
schema = StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), False),
StructField("true_cluster", IntegerType(), False),
])
df_raw = spark.createDataFrame(raw_points, schema=schema)
total_points = df_raw.count()
print(f"Nombre total de points générés : {total_points}")
print(f"Nombre de clusters attendus : {len(true_centers)}")
# Vérification rapide : pas de valeurs manquantes (le helper signale que NaN cause divergence)
nb_nan = df_raw.filter(F.col("x").isNull() | F.col("y").isNull()).count()
print(f"Nombre de lignes avec valeurs manquantes : {nb_nan}")
# Assemblage des colonnes numériques en un vecteur de features
assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features_raw")
df_assembled = assembler.transform(df_raw)
# Standardisation : moyenne nulle et variance unitaire (évite la divergence du KMeans)
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features",
withMean=True,
withStd=True,
)
scaler_model = scaler.fit(df_assembled)
df_features = scaler_model.transform(df_assembled).select("true_cluster", "features").cache()
# On force le calcul (matérialisation du cache) pour que les étapes suivantes soient comparables
df_features.count()
print("\nÉchantillon du DataFrame de features standardisées :")
df_features.show(5, truncate=False)
print(f"Nombre de partitions du DataFrame de features : {df_features.rdd.getNumPartitions()}")
Nombre total de points générés : 2000 Nombre de clusters attendus : 5 Nombre de lignes avec valeurs manquantes : 0 Échantillon du DataFrame de features standardisées : +------------+-----------------------------------------+ |true_cluster|features | +------------+-----------------------------------------+ |0 |[-1.1194335909363806,-1.1301886972586033]| |0 |[-1.1143890233648586,-0.9950542981929266]| |0 |[-1.1168936355950656,-1.3347621600422455]| |0 |[-1.0461059096110321,-1.1447748788232734]| |0 |[-1.130649313441016,-1.0855826731405052] | +------------+-----------------------------------------+ only showing top 5 rows Nombre de partitions du DataFrame de features : 8
2. Étape 2 — Algorithme itératif (KMeans)¶
Nous exécutons KMeans pour plusieurs valeurs de k (balayage de 2 à 6, soit 5 itérations) afin :
- d'évaluer la qualité du clustering via le score de silhouette (plus il est proche de 1, mieux les clusters sont séparés),
- de mesurer le temps écoulé par configuration (
elapsed_s), - de relever le coût intrinsèque (somme des distances au carré, training cost renvoyée par MLlib),
- de garder une trace du nombre de partitions utilisées.
Toutes ces mesures sont stockées dans la liste iter_metrics puis serviront à alimenter lab3_metrics_log.csv. La graine seed est fixée pour rendre les comparaisons reproductibles à cette étape.
# === Balayage de k : KMeans itératif avec instrumentation ===
evaluator = ClusteringEvaluator(
featuresCol="features",
predictionCol="prediction",
metricName="silhouette",
distanceMeasure="squaredEuclidean",
)
k_values = [2, 3, 4, 5, 6]
iter_metrics = [] # une ligne par itération (par valeur de k)
best_k = None
best_silhouette = -1.0
best_predictions = None
print("Balayage des valeurs de k :")
print("-" * 70)
for k in k_values:
# Construction du modèle KMeans avec k clusters
km = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42, maxIter=20)
# Mesure du temps total d'entraînement + prédiction (wall-clock)
start = time.time()
model = km.fit(df_features)
predictions = model.transform(df_features)
# On force la matérialisation pour mesurer le coût réel
nb_pred = predictions.count()
elapsed = time.time() - start
# Score de silhouette (qualité de séparation des clusters)
silhouette = evaluator.evaluate(predictions)
# Coût d'entraînement renvoyé par MLlib (somme des distances au carré au centre)
training_cost = model.summary.trainingCost
metrics_row = {
"iteration": k,
"k": k,
"elapsed_s": round(elapsed, 3),
"silhouette": round(silhouette, 4),
"training_cost": round(training_cost, 4),
"num_partitions": df_features.rdd.getNumPartitions(),
"strategy": "baseline",
}
iter_metrics.append(metrics_row)
print(
f"k={k} | silhouette={silhouette:.4f} | cost={training_cost:.2f} | "
f"elapsed={elapsed:.2f}s | predictions={nb_pred}"
)
# On garde le meilleur modèle pour les étapes suivantes
if silhouette > best_silhouette:
best_silhouette = silhouette
best_k = k
best_predictions = predictions
print("-" * 70)
print(f"Meilleure valeur de k : {best_k} (silhouette = {best_silhouette:.4f})")
# Sauvegarde des affectations de clusters pour le meilleur modèle (livrable demandé)
best_predictions.select("true_cluster", "prediction").write.mode("overwrite").parquet(
str(clusters_dir)
)
print(f"Affectations de clusters sauvegardées dans : {clusters_dir}")
Balayage des valeurs de k : ---------------------------------------------------------------------- k=2 | silhouette=0.4158 | cost=2788.97 | elapsed=3.30s | predictions=2000 k=3 | silhouette=0.6263 | cost=1403.94 | elapsed=1.47s | predictions=2000 k=4 | silhouette=0.8020 | cost=574.90 | elapsed=1.21s | predictions=2000 k=5 | silhouette=0.9579 | cost=95.32 | elapsed=1.53s | predictions=2000 k=6 | silhouette=0.8554 | cost=89.14 | elapsed=2.32s | predictions=2000 ---------------------------------------------------------------------- Meilleure valeur de k : 5 (silhouette = 0.9579)
26/04/29 20:14:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers [Stage 169:> (0 + 8) / 8]
Affectations de clusters sauvegardées dans : outputs/lab3/cluster_assignments
3. Étape 3 — Expérience de partitionnement (avant/après)¶
Cette étape compare deux stratégies de partitionnement avec le même algorithme KMeans (k = best_k) :
- Avant (baseline) : on force un sur-partitionnement avec
repartition(64)pour reproduire un cas pathologique où Spark passe trop de temps en shuffle / scheduling sur des partitions minuscules. - Après (optimisé) : on applique
coalesce(4)pour réduire le nombre de partitions à un nombre raisonnable proche du nombre de cœurs disponibles, ce qui réduit drastiquement les coûts de shuffle.
Pour chaque stratégie, on mesure :
- le nombre de partitions effectif,
- la durée totale d'entraînement,
- la silhouette,
- la taille des données échangées (lecture/écriture de shuffle agrégée via
StatusTracker).
On capture aussi les plans physiques plan_before.txt et plan_after.txt (livrables demandés dans proof/).
# === Comparaison de stratégies de partitionnement ===
def capture_plan(df, file_path):
"""Capture le plan physique formaté d'un DataFrame dans un fichier texte."""
old_stdout = sys.stdout
buffer = io.StringIO()
sys.stdout = buffer
try:
df.explain("formatted")
finally:
sys.stdout = old_stdout
with open(file_path, "w") as f:
f.write(buffer.getvalue())
def aggregate_shuffle_bytes(spark_session, job_ids):
"""Agrège les octets de shuffle (lecture + écriture) sur les jobs donnés via le StatusTracker."""
tracker = spark_session.sparkContext.statusTracker()
total_read = 0
total_written = 0
for jid in job_ids:
job = tracker.getJobInfo(jid)
if job is None:
continue
for sid in job.stageIds:
stage = tracker.getStageInfo(sid)
if stage is None:
continue
total_read += getattr(stage, "shuffleReadBytes", 0) or 0
total_written += getattr(stage, "shuffleWriteBytes", 0) or 0
return total_read, total_written
def run_strategy(label, df_input, k, plan_file):
"""Exécute KMeans sur un DataFrame déjà partitionné, mesure le temps et le shuffle."""
nb_partitions = df_input.rdd.getNumPartitions()
print(f"\n[{label}] Nombre de partitions : {nb_partitions}")
# Capture du plan physique avant exécution
capture_plan(df_input, plan_file)
# Bornes des job ids pour isoler les jobs propres à cette stratégie
job_ids_before = set(spark.sparkContext.statusTracker().getJobIdsForGroup(None))
km = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42, maxIter=20)
start = time.time()
model = km.fit(df_input)
predictions = model.transform(df_input)
nb_pred = predictions.count()
elapsed = time.time() - start
silhouette = evaluator.evaluate(predictions)
job_ids_after = set(spark.sparkContext.statusTracker().getJobIdsForGroup(None))
new_jobs = sorted(job_ids_after - job_ids_before)
shuffle_read, shuffle_write = aggregate_shuffle_bytes(spark, new_jobs)
print(f"[{label}] elapsed = {elapsed:.2f}s | silhouette = {silhouette:.4f}")
print(
f"[{label}] shuffle_read = {shuffle_read} octets | "
f"shuffle_write = {shuffle_write} octets | predictions = {nb_pred}"
)
return {
"strategy": label,
"k": k,
"num_partitions": nb_partitions,
"elapsed_s": round(elapsed, 3),
"silhouette": round(silhouette, 4),
"shuffle_read_bytes": shuffle_read,
"shuffle_write_bytes": shuffle_write,
}
# Stratégie AVANT : sur-partitionnement (cas non optimisé)
df_before = df_features.repartition(64).cache()
df_before.count()
metrics_before = run_strategy(
label="before_repartition_64",
df_input=df_before,
k=best_k,
plan_file=proof_dir / "plan_before.txt",
)
# Stratégie APRÈS : coalesce vers un nombre de partitions raisonnable
df_after = df_features.coalesce(4).cache()
df_after.count()
metrics_after = run_strategy(
label="after_coalesce_4",
df_input=df_after,
k=best_k,
plan_file=proof_dir / "plan_after.txt",
)
# Capture aussi un plan d'itération générique (livrable demandé : plan_iteration.txt)
capture_plan(
KMeans(featuresCol="features", predictionCol="prediction", k=best_k, seed=42)
.fit(df_features)
.transform(df_features),
proof_dir / "plan_iteration.txt",
)
# Calcul de la réduction de shuffle (utile pour le rapport avant/après)
if metrics_before["shuffle_write_bytes"] > 0:
reduction = (
(metrics_before["shuffle_write_bytes"] - metrics_after["shuffle_write_bytes"])
/ metrics_before["shuffle_write_bytes"]
* 100
)
print(f"\nRéduction de shuffle write : {reduction:.1f}% (avant -> après)")
else:
print("\nPas de shuffle mesuré côté avant : la comparaison reste qualitative.")
partition_metrics = [metrics_before, metrics_after]
[before_repartition_64] Nombre de partitions : 64 [before_repartition_64] elapsed = 3.89s | silhouette = 0.9579 [before_repartition_64] shuffle_read = 0 octets | shuffle_write = 0 octets | predictions = 2000 [after_coalesce_4] Nombre de partitions : 4 [after_coalesce_4] elapsed = 0.86s | silhouette = 0.9579 [after_coalesce_4] shuffle_read = 0 octets | shuffle_write = 0 octets | predictions = 2000 Pas de shuffle mesuré côté avant : la comparaison reste qualitative.
4. Étape 4 — Analyse de convergence et stabilité (graines multiples)¶
Pour évaluer la stabilité de KMeans (algorithme sensible à l'initialisation), nous relançons l'entraînement avec 5 graines différentes à k = best_k. Pour chaque exécution :
- on enregistre la silhouette,
- on enregistre le coût d'entraînement,
- on enregistre le temps écoulé.
Nous calculons ensuite la moyenne et l'écart-type de la silhouette. Un faible écart-type indique un clustering robuste à l'initialisation. Cette analyse correspond à l'attendu seed stability analysis (>= 5 seeds, mean +/- std of silhouette) du helper.
# === Analyse de stabilité : plusieurs graines pour k = best_k ===
seeds = [11, 23, 42, 77, 101]
stability_metrics = []
print(f"Analyse de stabilité avec k = {best_k} sur {len(seeds)} graines :")
print("-" * 70)
for seed in seeds:
km = KMeans(featuresCol="features", predictionCol="prediction", k=best_k, seed=seed, maxIter=20)
start = time.time()
model = km.fit(df_features)
predictions = model.transform(df_features)
predictions.count() # matérialisation
elapsed = time.time() - start
silhouette = evaluator.evaluate(predictions)
cost = model.summary.trainingCost
stability_metrics.append({
"seed": seed,
"k": best_k,
"silhouette": round(silhouette, 4),
"training_cost": round(cost, 4),
"elapsed_s": round(elapsed, 3),
})
print(
f"seed={seed} | silhouette={silhouette:.4f} | cost={cost:.2f} | elapsed={elapsed:.2f}s"
)
# Statistiques agrégées sur la silhouette
silhouettes = [row["silhouette"] for row in stability_metrics]
mean_silhouette = statistics.mean(silhouettes)
std_silhouette = statistics.pstdev(silhouettes)
print("-" * 70)
print(
f"Silhouette moyenne : {mean_silhouette:.4f} | écart-type : {std_silhouette:.4f} "
f"(min = {min(silhouettes):.4f}, max = {max(silhouettes):.4f})"
)
if std_silhouette < 0.01:
print("Conclusion : clustering très stable face à l'initialisation.")
else:
print("Conclusion : sensibilité notable à l'initialisation, à documenter dans le rapport.")
Analyse de stabilité avec k = 5 sur 5 graines : ---------------------------------------------------------------------- seed=11 | silhouette=0.9579 | cost=95.32 | elapsed=0.96s seed=23 | silhouette=0.9579 | cost=95.32 | elapsed=0.84s seed=42 | silhouette=0.9579 | cost=95.32 | elapsed=0.84s seed=77 | silhouette=0.9579 | cost=95.32 | elapsed=0.82s seed=101 | silhouette=0.9579 | cost=95.32 | elapsed=0.86s ---------------------------------------------------------------------- Silhouette moyenne : 0.9579 | écart-type : 0.0000 (min = 0.9579, max = 0.9579) Conclusion : clustering très stable face à l'initialisation.
5. Étape 5 — Preuves et journal de métriques¶
Cette étape produit les livrables finaux demandés :
proof/plan_iteration.txt,proof/plan_before.txt,proof/plan_after.txt: plans physiques formatés (déjà écrits à l'étape 3),lab3_metrics_log.csv: journal CSV unifié regroupant toutes les métriques par itération et par configuration de partitionnement, ainsi que les résultats de l'analyse de stabilité,- un rappel de l'URL Spark UI à capturer pour le dossier
proof/.
Le journal de métriques contient les colonnes : phase, strategy, k, seed, num_partitions, elapsed_s, silhouette, training_cost, shuffle_read_bytes, shuffle_write_bytes. Cela permet de répondre à l'auto-check du helper : au moins 2 stratégies de partitionnement comparées dans le journal de métriques et réduction quantitative du shuffle.
# === Écriture du journal de métriques unifié ===
metrics_log_file = "lab3_metrics_log.csv"
current_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
fieldnames = [
"timestamp",
"phase",
"strategy",
"k",
"seed",
"num_partitions",
"elapsed_s",
"silhouette",
"training_cost",
"shuffle_read_bytes",
"shuffle_write_bytes",
]
with open(metrics_log_file, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# Phase 1 : balayage de k (étape 2)
for row in iter_metrics:
writer.writerow({
"timestamp": current_timestamp,
"phase": "k_sweep",
"strategy": row["strategy"],
"k": row["k"],
"seed": 42,
"num_partitions": row["num_partitions"],
"elapsed_s": row["elapsed_s"],
"silhouette": row["silhouette"],
"training_cost": row["training_cost"],
"shuffle_read_bytes": "",
"shuffle_write_bytes": "",
})
# Phase 2 : comparaison de partitionnement (étape 3)
for row in partition_metrics:
writer.writerow({
"timestamp": current_timestamp,
"phase": "partitioning",
"strategy": row["strategy"],
"k": row["k"],
"seed": 42,
"num_partitions": row["num_partitions"],
"elapsed_s": row["elapsed_s"],
"silhouette": row["silhouette"],
"training_cost": "",
"shuffle_read_bytes": row["shuffle_read_bytes"],
"shuffle_write_bytes": row["shuffle_write_bytes"],
})
# Phase 3 : stabilité aux graines (étape 4)
for row in stability_metrics:
writer.writerow({
"timestamp": current_timestamp,
"phase": "seed_stability",
"strategy": "baseline",
"k": row["k"],
"seed": row["seed"],
"num_partitions": df_features.rdd.getNumPartitions(),
"elapsed_s": row["elapsed_s"],
"silhouette": row["silhouette"],
"training_cost": row["training_cost"],
"shuffle_read_bytes": "",
"shuffle_write_bytes": "",
})
print(f"Journal de métriques écrit dans : {metrics_log_file}")
print("Plans physiques disponibles dans : proof/plan_iteration.txt, plan_before.txt, plan_after.txt")
print("Pensez à capturer une copie d'écran de l'interface Spark UI (http://localhost:4040) dans proof/.")
# Récapitulatif lisible
print("\nRécapitulatif final :")
print(f" - Meilleure valeur de k : {best_k} (silhouette = {best_silhouette:.4f})")
print(
f" - Stratégies comparées : before_repartition_64 vs after_coalesce_4 "
f"(elapsed {metrics_before['elapsed_s']}s -> {metrics_after['elapsed_s']}s)"
)
print(
f" - Stabilité (silhouette) : moyenne = {mean_silhouette:.4f}, écart-type = {std_silhouette:.4f}"
)
Journal de métriques écrit dans : lab3_metrics_log.csv Plans physiques disponibles dans : proof/plan_iteration.txt, plan_before.txt, plan_after.txt Pensez à capturer une copie d'écran de l'interface Spark UI (http://localhost:4040) dans proof/. Récapitulatif final : - Meilleure valeur de k : 5 (silhouette = 0.9579) - Stratégies comparées : before_repartition_64 vs after_coalesce_4 (elapsed 3.886s -> 0.861s) - Stabilité (silhouette) : moyenne = 0.9579, écart-type = 0.0000
6. Arrêt de la session Spark¶
Nous fermons proprement la session Spark pour libérer les ressources locales et marquer la fin de l'exécution end-to-end du notebook. Les livrables (outputs/lab3/, proof/, lab3_metrics_log.csv) sont prêts pour le rapport comparatif avant/après.
spark.stop()
print("Assignment 3 terminé avec succès.")
Assignment 3 terminé avec succès.