DE2 — Assignment 3 : Graphes ou Clustering¶

Auteur : Badr TAJINI - Data Engineering II (Workloads Intensifs en Données) - ESIEE 2025-2026

Piste : (Saisir votre piste : A/B/C/D)

Chemin choisi : Clustering (PySpark MLlib — KMeans)

Noms : (Étudiant 1 — Étudiant 2)

Ce notebook met en œuvre un workload itératif instrumenté sur le chemin Clustering. Nous nous concentrons sur :

  • la construction des vecteurs de features,
  • l'exécution itérative de KMeans avec métriques par itération,
  • la comparaison de stratégies de partitionnement (avant/après repartition) et l'analyse des coûts de shuffle,
  • le suivi de la convergence via le score de silhouette,
  • la production des preuves (plans, captures Spark UI, journal de métriques) et d'un rapport comparatif avant/après.

Référez-vous à helper_assignment3-de2_esiee.md et au document Lab 3 Overview pour les attendus détaillés.

0. Configuration initiale et préparation¶

Cette première cellule initialise la session Spark, configure les paramètres réseau locaux, ajuste le nombre de partitions de shuffle (spark.sql.shuffle.partitions) à une valeur modeste pour des expérimentations sur poste local, et crée les répertoires de sortie nécessaires (outputs/lab3/, proof/). Nous affichons également l'URL de l'interface Spark UI pour pouvoir capturer les preuves visuelles demandées.

In [1]:
# === Importations et configuration ===
import os
import io
import sys
import csv
import time
import shutil
import pathlib
import statistics
from urllib.parse import urlparse

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Variables d'environnement pour la communication Spark en local
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)


def show_spark_ui(spark_session):
    """Affiche la version de Spark et l'URL de l'interface Spark UI."""
    ui_url = spark_session.sparkContext.uiWebUrl
    print("Version Spark :", spark_session.version)
    if ui_url:
        ui_port = urlparse(ui_url).port or 4040
        print("Interface Spark :", ui_url)
        print("Interface Spark (navigateur WSL/Windows) :", f"http://localhost:{ui_port}")
    else:
        print("Interface Spark : non disponible")


# Initialisation de la session Spark
# On force un nombre raisonnable de partitions de shuffle pour pouvoir comparer les stratégies
spark = SparkSession.builder \
    .appName("de2-assignment3") \
    .master("local[*]") \
    .config("spark.driver.host", DE2_SPARK_DRIVER_HOST) \
    .config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS) \
    .config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS) \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

show_spark_ui(spark)

# Création des répertoires de sortie
output_dir = pathlib.Path("outputs/lab3")
output_dir.mkdir(parents=True, exist_ok=True)
clusters_dir = output_dir / "cluster_assignments"
proof_dir = pathlib.Path("proof")
proof_dir.mkdir(parents=True, exist_ok=True)

# On nettoie les anciennes sorties pour repartir d'un état propre
if clusters_dir.exists():
    shutil.rmtree(clusters_dir)
clusters_dir.mkdir(parents=True, exist_ok=True)

print("Répertoires de sortie créés avec succès.")
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/04/29 20:12:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Version Spark : 4.0.0
Interface Spark : http://127.0.0.1:4040
Interface Spark (navigateur WSL/Windows) : http://localhost:4040
Répertoires de sortie créés avec succès.

1. Étape 1 — Données et features¶

Nous construisons un jeu de données synthétique en deux dimensions composé de plusieurs blobs (gaussiennes isotropes) afin d'avoir une structure de clusters connue à l'avance. Cela permet de valider visuellement et numériquement la qualité du clustering. Les étapes :

  1. génération aléatoire de points autour de centres bien séparés,
  2. assemblage des colonnes numériques en un vecteur de features avec VectorAssembler,
  3. standardisation des features avec StandardScaler pour éviter les dérives liées aux échelles (recommandation du helper : normalize before VectorAssembler/clustering).

Le DataFrame final df_features sera réutilisé dans toutes les étapes suivantes.

In [2]:
# === Génération du jeu de données et préparation des vecteurs de features ===
import random

# Centres connus des clusters (vérité terrain) et nombre de points par cluster
true_centers = [(0.0, 0.0), (10.0, 10.0), (0.0, 10.0), (10.0, 0.0), (5.0, 5.0)]
points_per_cluster = 400
noise_std = 0.7
random.seed(42)

# Génération des points : pour chaque centre, on tire des points avec un bruit gaussien
raw_points = []
for cluster_id, (cx, cy) in enumerate(true_centers):
    for _ in range(points_per_cluster):
        x = random.gauss(cx, noise_std)
        y = random.gauss(cy, noise_std)
        raw_points.append((x, y, cluster_id))

# Schéma explicite pour bénéficier des optimisations Catalyst
schema = StructType([
    StructField("x", DoubleType(), False),
    StructField("y", DoubleType(), False),
    StructField("true_cluster", IntegerType(), False),
])

df_raw = spark.createDataFrame(raw_points, schema=schema)
total_points = df_raw.count()
print(f"Nombre total de points générés : {total_points}")
print(f"Nombre de clusters attendus : {len(true_centers)}")

# Vérification rapide : pas de valeurs manquantes (le helper signale que NaN cause divergence)
nb_nan = df_raw.filter(F.col("x").isNull() | F.col("y").isNull()).count()
print(f"Nombre de lignes avec valeurs manquantes : {nb_nan}")

# Assemblage des colonnes numériques en un vecteur de features
assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features_raw")
df_assembled = assembler.transform(df_raw)

# Standardisation : moyenne nulle et variance unitaire (évite la divergence du KMeans)
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True,
)
scaler_model = scaler.fit(df_assembled)
df_features = scaler_model.transform(df_assembled).select("true_cluster", "features").cache()

# On force le calcul (matérialisation du cache) pour que les étapes suivantes soient comparables
df_features.count()
print("\nÉchantillon du DataFrame de features standardisées :")
df_features.show(5, truncate=False)

print(f"Nombre de partitions du DataFrame de features : {df_features.rdd.getNumPartitions()}")
                                                                                
Nombre total de points générés : 2000
Nombre de clusters attendus : 5
Nombre de lignes avec valeurs manquantes : 0

Échantillon du DataFrame de features standardisées :
+------------+-----------------------------------------+
|true_cluster|features                                 |
+------------+-----------------------------------------+
|0           |[-1.1194335909363806,-1.1301886972586033]|
|0           |[-1.1143890233648586,-0.9950542981929266]|
|0           |[-1.1168936355950656,-1.3347621600422455]|
|0           |[-1.0461059096110321,-1.1447748788232734]|
|0           |[-1.130649313441016,-1.0855826731405052] |
+------------+-----------------------------------------+
only showing top 5 rows
Nombre de partitions du DataFrame de features : 8

2. Étape 2 — Algorithme itératif (KMeans)¶

Nous exécutons KMeans pour plusieurs valeurs de k (balayage de 2 à 6, soit 5 itérations) afin :

  • d'évaluer la qualité du clustering via le score de silhouette (plus il est proche de 1, mieux les clusters sont séparés),
  • de mesurer le temps écoulé par configuration (elapsed_s),
  • de relever le coût intrinsèque (somme des distances au carré, training cost renvoyée par MLlib),
  • de garder une trace du nombre de partitions utilisées.

Toutes ces mesures sont stockées dans la liste iter_metrics puis serviront à alimenter lab3_metrics_log.csv. La graine seed est fixée pour rendre les comparaisons reproductibles à cette étape.

In [3]:
# === Balayage de k : KMeans itératif avec instrumentation ===
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="prediction",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean",
)

k_values = [2, 3, 4, 5, 6]
iter_metrics = []  # une ligne par itération (par valeur de k)

best_k = None
best_silhouette = -1.0
best_predictions = None

print("Balayage des valeurs de k :")
print("-" * 70)

for k in k_values:
    # Construction du modèle KMeans avec k clusters
    km = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42, maxIter=20)

    # Mesure du temps total d'entraînement + prédiction (wall-clock)
    start = time.time()
    model = km.fit(df_features)
    predictions = model.transform(df_features)
    # On force la matérialisation pour mesurer le coût réel
    nb_pred = predictions.count()
    elapsed = time.time() - start

    # Score de silhouette (qualité de séparation des clusters)
    silhouette = evaluator.evaluate(predictions)

    # Coût d'entraînement renvoyé par MLlib (somme des distances au carré au centre)
    training_cost = model.summary.trainingCost

    metrics_row = {
        "iteration": k,
        "k": k,
        "elapsed_s": round(elapsed, 3),
        "silhouette": round(silhouette, 4),
        "training_cost": round(training_cost, 4),
        "num_partitions": df_features.rdd.getNumPartitions(),
        "strategy": "baseline",
    }
    iter_metrics.append(metrics_row)

    print(
        f"k={k} | silhouette={silhouette:.4f} | cost={training_cost:.2f} | "
        f"elapsed={elapsed:.2f}s | predictions={nb_pred}"
    )

    # On garde le meilleur modèle pour les étapes suivantes
    if silhouette > best_silhouette:
        best_silhouette = silhouette
        best_k = k
        best_predictions = predictions

print("-" * 70)
print(f"Meilleure valeur de k : {best_k} (silhouette = {best_silhouette:.4f})")

# Sauvegarde des affectations de clusters pour le meilleur modèle (livrable demandé)
best_predictions.select("true_cluster", "prediction").write.mode("overwrite").parquet(
    str(clusters_dir)
)
print(f"Affectations de clusters sauvegardées dans : {clusters_dir}")
Balayage des valeurs de k :
----------------------------------------------------------------------
k=2 | silhouette=0.4158 | cost=2788.97 | elapsed=3.30s | predictions=2000
k=3 | silhouette=0.6263 | cost=1403.94 | elapsed=1.47s | predictions=2000
k=4 | silhouette=0.8020 | cost=574.90 | elapsed=1.21s | predictions=2000
k=5 | silhouette=0.9579 | cost=95.32 | elapsed=1.53s | predictions=2000
k=6 | silhouette=0.8554 | cost=89.14 | elapsed=2.32s | predictions=2000
----------------------------------------------------------------------
Meilleure valeur de k : 5 (silhouette = 0.9579)
26/04/29 20:14:16 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
[Stage 169:>                                                        (0 + 8) / 8]
Affectations de clusters sauvegardées dans : outputs/lab3/cluster_assignments
                                                                                

3. Étape 3 — Expérience de partitionnement (avant/après)¶

Cette étape compare deux stratégies de partitionnement avec le même algorithme KMeans (k = best_k) :

  • Avant (baseline) : on force un sur-partitionnement avec repartition(64) pour reproduire un cas pathologique où Spark passe trop de temps en shuffle / scheduling sur des partitions minuscules.
  • Après (optimisé) : on applique coalesce(4) pour réduire le nombre de partitions à un nombre raisonnable proche du nombre de cœurs disponibles, ce qui réduit drastiquement les coûts de shuffle.

Pour chaque stratégie, on mesure :

  • le nombre de partitions effectif,
  • la durée totale d'entraînement,
  • la silhouette,
  • la taille des données échangées (lecture/écriture de shuffle agrégée via StatusTracker).

On capture aussi les plans physiques plan_before.txt et plan_after.txt (livrables demandés dans proof/).

In [4]:
# === Comparaison de stratégies de partitionnement ===

def capture_plan(df, file_path):
    """Capture le plan physique formaté d'un DataFrame dans un fichier texte."""
    old_stdout = sys.stdout
    buffer = io.StringIO()
    sys.stdout = buffer
    try:
        df.explain("formatted")
    finally:
        sys.stdout = old_stdout
    with open(file_path, "w") as f:
        f.write(buffer.getvalue())


def aggregate_shuffle_bytes(spark_session, job_ids):
    """Agrège les octets de shuffle (lecture + écriture) sur les jobs donnés via le StatusTracker."""
    tracker = spark_session.sparkContext.statusTracker()
    total_read = 0
    total_written = 0
    for jid in job_ids:
        job = tracker.getJobInfo(jid)
        if job is None:
            continue
        for sid in job.stageIds:
            stage = tracker.getStageInfo(sid)
            if stage is None:
                continue
            total_read += getattr(stage, "shuffleReadBytes", 0) or 0
            total_written += getattr(stage, "shuffleWriteBytes", 0) or 0
    return total_read, total_written


def run_strategy(label, df_input, k, plan_file):
    """Exécute KMeans sur un DataFrame déjà partitionné, mesure le temps et le shuffle."""
    nb_partitions = df_input.rdd.getNumPartitions()
    print(f"\n[{label}] Nombre de partitions : {nb_partitions}")

    # Capture du plan physique avant exécution
    capture_plan(df_input, plan_file)

    # Bornes des job ids pour isoler les jobs propres à cette stratégie
    job_ids_before = set(spark.sparkContext.statusTracker().getJobIdsForGroup(None))

    km = KMeans(featuresCol="features", predictionCol="prediction", k=k, seed=42, maxIter=20)
    start = time.time()
    model = km.fit(df_input)
    predictions = model.transform(df_input)
    nb_pred = predictions.count()
    elapsed = time.time() - start

    silhouette = evaluator.evaluate(predictions)

    job_ids_after = set(spark.sparkContext.statusTracker().getJobIdsForGroup(None))
    new_jobs = sorted(job_ids_after - job_ids_before)
    shuffle_read, shuffle_write = aggregate_shuffle_bytes(spark, new_jobs)

    print(f"[{label}] elapsed = {elapsed:.2f}s | silhouette = {silhouette:.4f}")
    print(
        f"[{label}] shuffle_read = {shuffle_read} octets | "
        f"shuffle_write = {shuffle_write} octets | predictions = {nb_pred}"
    )

    return {
        "strategy": label,
        "k": k,
        "num_partitions": nb_partitions,
        "elapsed_s": round(elapsed, 3),
        "silhouette": round(silhouette, 4),
        "shuffle_read_bytes": shuffle_read,
        "shuffle_write_bytes": shuffle_write,
    }


# Stratégie AVANT : sur-partitionnement (cas non optimisé)
df_before = df_features.repartition(64).cache()
df_before.count()
metrics_before = run_strategy(
    label="before_repartition_64",
    df_input=df_before,
    k=best_k,
    plan_file=proof_dir / "plan_before.txt",
)

# Stratégie APRÈS : coalesce vers un nombre de partitions raisonnable
df_after = df_features.coalesce(4).cache()
df_after.count()
metrics_after = run_strategy(
    label="after_coalesce_4",
    df_input=df_after,
    k=best_k,
    plan_file=proof_dir / "plan_after.txt",
)

# Capture aussi un plan d'itération générique (livrable demandé : plan_iteration.txt)
capture_plan(
    KMeans(featuresCol="features", predictionCol="prediction", k=best_k, seed=42)
    .fit(df_features)
    .transform(df_features),
    proof_dir / "plan_iteration.txt",
)

# Calcul de la réduction de shuffle (utile pour le rapport avant/après)
if metrics_before["shuffle_write_bytes"] > 0:
    reduction = (
        (metrics_before["shuffle_write_bytes"] - metrics_after["shuffle_write_bytes"])
        / metrics_before["shuffle_write_bytes"]
        * 100
    )
    print(f"\nRéduction de shuffle write : {reduction:.1f}% (avant -> après)")
else:
    print("\nPas de shuffle mesuré côté avant : la comparaison reste qualitative.")

partition_metrics = [metrics_before, metrics_after]
[before_repartition_64] Nombre de partitions : 64
[before_repartition_64] elapsed = 3.89s | silhouette = 0.9579
[before_repartition_64] shuffle_read = 0 octets | shuffle_write = 0 octets | predictions = 2000

[after_coalesce_4] Nombre de partitions : 4
[after_coalesce_4] elapsed = 0.86s | silhouette = 0.9579
[after_coalesce_4] shuffle_read = 0 octets | shuffle_write = 0 octets | predictions = 2000

Pas de shuffle mesuré côté avant : la comparaison reste qualitative.

4. Étape 4 — Analyse de convergence et stabilité (graines multiples)¶

Pour évaluer la stabilité de KMeans (algorithme sensible à l'initialisation), nous relançons l'entraînement avec 5 graines différentes à k = best_k. Pour chaque exécution :

  • on enregistre la silhouette,
  • on enregistre le coût d'entraînement,
  • on enregistre le temps écoulé.

Nous calculons ensuite la moyenne et l'écart-type de la silhouette. Un faible écart-type indique un clustering robuste à l'initialisation. Cette analyse correspond à l'attendu seed stability analysis (>= 5 seeds, mean +/- std of silhouette) du helper.

In [5]:
# === Analyse de stabilité : plusieurs graines pour k = best_k ===
seeds = [11, 23, 42, 77, 101]
stability_metrics = []

print(f"Analyse de stabilité avec k = {best_k} sur {len(seeds)} graines :")
print("-" * 70)

for seed in seeds:
    km = KMeans(featuresCol="features", predictionCol="prediction", k=best_k, seed=seed, maxIter=20)
    start = time.time()
    model = km.fit(df_features)
    predictions = model.transform(df_features)
    predictions.count()  # matérialisation
    elapsed = time.time() - start

    silhouette = evaluator.evaluate(predictions)
    cost = model.summary.trainingCost

    stability_metrics.append({
        "seed": seed,
        "k": best_k,
        "silhouette": round(silhouette, 4),
        "training_cost": round(cost, 4),
        "elapsed_s": round(elapsed, 3),
    })

    print(
        f"seed={seed} | silhouette={silhouette:.4f} | cost={cost:.2f} | elapsed={elapsed:.2f}s"
    )

# Statistiques agrégées sur la silhouette
silhouettes = [row["silhouette"] for row in stability_metrics]
mean_silhouette = statistics.mean(silhouettes)
std_silhouette = statistics.pstdev(silhouettes)

print("-" * 70)
print(
    f"Silhouette moyenne : {mean_silhouette:.4f} | écart-type : {std_silhouette:.4f} "
    f"(min = {min(silhouettes):.4f}, max = {max(silhouettes):.4f})"
)
if std_silhouette < 0.01:
    print("Conclusion : clustering très stable face à l'initialisation.")
else:
    print("Conclusion : sensibilité notable à l'initialisation, à documenter dans le rapport.")
Analyse de stabilité avec k = 5 sur 5 graines :
----------------------------------------------------------------------
seed=11 | silhouette=0.9579 | cost=95.32 | elapsed=0.96s
seed=23 | silhouette=0.9579 | cost=95.32 | elapsed=0.84s
seed=42 | silhouette=0.9579 | cost=95.32 | elapsed=0.84s
seed=77 | silhouette=0.9579 | cost=95.32 | elapsed=0.82s
seed=101 | silhouette=0.9579 | cost=95.32 | elapsed=0.86s
----------------------------------------------------------------------
Silhouette moyenne : 0.9579 | écart-type : 0.0000 (min = 0.9579, max = 0.9579)
Conclusion : clustering très stable face à l'initialisation.

5. Étape 5 — Preuves et journal de métriques¶

Cette étape produit les livrables finaux demandés :

  • proof/plan_iteration.txt, proof/plan_before.txt, proof/plan_after.txt : plans physiques formatés (déjà écrits à l'étape 3),
  • lab3_metrics_log.csv : journal CSV unifié regroupant toutes les métriques par itération et par configuration de partitionnement, ainsi que les résultats de l'analyse de stabilité,
  • un rappel de l'URL Spark UI à capturer pour le dossier proof/.

Le journal de métriques contient les colonnes : phase, strategy, k, seed, num_partitions, elapsed_s, silhouette, training_cost, shuffle_read_bytes, shuffle_write_bytes. Cela permet de répondre à l'auto-check du helper : au moins 2 stratégies de partitionnement comparées dans le journal de métriques et réduction quantitative du shuffle.

In [6]:
# === Écriture du journal de métriques unifié ===
metrics_log_file = "lab3_metrics_log.csv"
current_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")

fieldnames = [
    "timestamp",
    "phase",
    "strategy",
    "k",
    "seed",
    "num_partitions",
    "elapsed_s",
    "silhouette",
    "training_cost",
    "shuffle_read_bytes",
    "shuffle_write_bytes",
]

with open(metrics_log_file, "w", newline="") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

    # Phase 1 : balayage de k (étape 2)
    for row in iter_metrics:
        writer.writerow({
            "timestamp": current_timestamp,
            "phase": "k_sweep",
            "strategy": row["strategy"],
            "k": row["k"],
            "seed": 42,
            "num_partitions": row["num_partitions"],
            "elapsed_s": row["elapsed_s"],
            "silhouette": row["silhouette"],
            "training_cost": row["training_cost"],
            "shuffle_read_bytes": "",
            "shuffle_write_bytes": "",
        })

    # Phase 2 : comparaison de partitionnement (étape 3)
    for row in partition_metrics:
        writer.writerow({
            "timestamp": current_timestamp,
            "phase": "partitioning",
            "strategy": row["strategy"],
            "k": row["k"],
            "seed": 42,
            "num_partitions": row["num_partitions"],
            "elapsed_s": row["elapsed_s"],
            "silhouette": row["silhouette"],
            "training_cost": "",
            "shuffle_read_bytes": row["shuffle_read_bytes"],
            "shuffle_write_bytes": row["shuffle_write_bytes"],
        })

    # Phase 3 : stabilité aux graines (étape 4)
    for row in stability_metrics:
        writer.writerow({
            "timestamp": current_timestamp,
            "phase": "seed_stability",
            "strategy": "baseline",
            "k": row["k"],
            "seed": row["seed"],
            "num_partitions": df_features.rdd.getNumPartitions(),
            "elapsed_s": row["elapsed_s"],
            "silhouette": row["silhouette"],
            "training_cost": row["training_cost"],
            "shuffle_read_bytes": "",
            "shuffle_write_bytes": "",
        })

print(f"Journal de métriques écrit dans : {metrics_log_file}")
print("Plans physiques disponibles dans : proof/plan_iteration.txt, plan_before.txt, plan_after.txt")
print("Pensez à capturer une copie d'écran de l'interface Spark UI (http://localhost:4040) dans proof/.")

# Récapitulatif lisible
print("\nRécapitulatif final :")
print(f"  - Meilleure valeur de k : {best_k} (silhouette = {best_silhouette:.4f})")
print(
    f"  - Stratégies comparées : before_repartition_64 vs after_coalesce_4 "
    f"(elapsed {metrics_before['elapsed_s']}s -> {metrics_after['elapsed_s']}s)"
)
print(
    f"  - Stabilité (silhouette) : moyenne = {mean_silhouette:.4f}, écart-type = {std_silhouette:.4f}"
)
Journal de métriques écrit dans : lab3_metrics_log.csv
Plans physiques disponibles dans : proof/plan_iteration.txt, plan_before.txt, plan_after.txt
Pensez à capturer une copie d'écran de l'interface Spark UI (http://localhost:4040) dans proof/.

Récapitulatif final :
  - Meilleure valeur de k : 5 (silhouette = 0.9579)
  - Stratégies comparées : before_repartition_64 vs after_coalesce_4 (elapsed 3.886s -> 0.861s)
  - Stabilité (silhouette) : moyenne = 0.9579, écart-type = 0.0000

6. Arrêt de la session Spark¶

Nous fermons proprement la session Spark pour libérer les ressources locales et marquer la fin de l'exécution end-to-end du notebook. Les livrables (outputs/lab3/, proof/, lab3_metrics_log.csv) sont prêts pour le rapport comparatif avant/après.

In [7]:
spark.stop()
print("Assignment 3 terminé avec succès.")
Assignment 3 terminé avec succès.