DE2 — Lab 3 : Graphes ou Clustering — Charge de Travail Itérative Instrumentée (15%)¶

Auteur : Badr TAJINI - Data Engineering II (Workloads Intensifs en Données) - ESIEE 2025-2026

Piste : A (Esports)

Chemin choisi : Clustering (KMeans / BisectingKMeans)

Objectif : Exécuter une charge de travail itérative avec un focus ingénierie plateforme — stratégies de partitionnement, analyse de skew, coûts de shuffle par itération, convergence, et rapport comparatif avant/après.

Plan du notebook :

  1. Mise en place de la session Spark et des dossiers de sortie
  2. B.1 — Préparation des features (VectorAssembler + StandardScaler)
  3. B.2 — Balayages KMeans / BisectingKMeans sur k ∈ {4, 6, 8, 10}
  4. B.3 — Stabilité du meilleur k sur ≥ 5 graines (seeds)
  5. B.4 — Expérience de partitionnement (avant/après repartition)
  6. Sauvegarde des plans + écriture de lab3_metrics_log.csv
  7. Arrêt de la session Spark

0 — Mise en place de la session Spark¶

On crée une SparkSession locale, on prépare les dossiers de sortie (outputs/lab3 et proof) et on définit un petit utilitaire pour récupérer les bytes de shuffle via l'API REST de l'UI Spark.

In [5]:
import os
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType
import time, pathlib, csv, json
import urllib.request

# Variables d'environnement (utile sous WSL)
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)


def afficher_interface_spark(session_spark):
    """Affiche les URLs de l'interface Spark UI."""
    url_ui = session_spark.sparkContext.uiWebUrl
    print("Version Spark :", session_spark.version)
    if url_ui:
        port_ui = urlparse(url_ui).port or 4040
        print("Interface Spark :", url_ui)
        print("Interface Spark (navigateur WSL/Windows) :", f"http://localhost:{port_ui}")
    else:
        print("Interface Spark : non disponible")


# Création de la session Spark en mode local
spark = (
    SparkSession.builder
    .appName("DE2-Lab3-Iterative")
    .master("local[*]")
    .config("spark.driver.host", DE2_SPARK_DRIVER_HOST)
    .config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS)
    .config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS)
    .getOrCreate()
)

afficher_interface_spark(spark)

# Création des dossiers de sortie
pathlib.Path("outputs/lab3").mkdir(parents=True, exist_ok=True)
pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
print("Dossiers de sortie prêts : outputs/lab3, proof")
Version Spark : 4.0.0
Interface Spark : http://127.0.0.1:4040
Interface Spark (navigateur WSL/Windows) : http://localhost:4040
Dossiers de sortie prêts : outputs/lab3, proof

Utilitaire — Lecture des métriques de shuffle via l'API REST de Spark¶

L'API REST de l'UI Spark expose, pour chaque stage terminé, les bytes lus et écrits en shuffle. On lit le total cumulé puis, en faisant la différence avant/après une opération, on obtient le coût de shuffle de cette opération.

In [6]:
def cumul_shuffle_bytes():
    """Retourne (shuffle_read_total, shuffle_write_total) cumulés depuis le début
    de la session Spark via l'API REST de l'UI."""
    url_ui = spark.sparkContext.uiWebUrl
    app_id = spark.sparkContext.applicationId
    if not url_ui:
        return 0, 0
    try:
        with urllib.request.urlopen(f"{url_ui}/api/v1/applications/{app_id}/stages", timeout=3) as resp:
            stages = json.loads(resp.read().decode("utf-8"))
        read = sum(s.get("shuffleReadBytes", 0) for s in stages)
        write = sum(s.get("shuffleWriteBytes", 0) for s in stages)
        return read, write
    except Exception as e:
        # Si l'UI n'est pas accessible on retourne 0
        return 0, 0


def delta_shuffle(before):
    """Calcule (read, write) de shuffle entre deux mesures cumulees."""
    after = cumul_shuffle_bytes()
    return after[0] - before[0], after[1] - before[1]


# Test rapide de l'utilitaire
r0, w0 = cumul_shuffle_bytes()
print(f"Shuffle cumule au demarrage : read={r0} bytes, write={w0} bytes")
Shuffle cumule au demarrage : read=495839 bytes, write=495839 bytes

B.1 — Préparation des features¶

Objectif : construire un jeu de données représentant des statistiques de héros Esports (Track A) puis assembler et normaliser les features numériques pour le clustering.

  • Features : win_rate, pick_rate, kda_ratio
  • Pipeline : VectorAssembler → StandardScaler (centré + réduit)
  • Mise en cache du DataFrame pour éviter de recalculer à chaque itération de KMeans
In [7]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import random

# Generation des donnees synthetiques : 500 heros avec 3 stats
print("Generation des donnees - Stats de heros Esports (Track A)...")
random.seed(42)
heroes_data = []
for hero_id in range(500):
    hero_name = f"hero_{hero_id}"
    win_rate = random.uniform(0.35, 0.65)
    pick_rate = random.uniform(0.01, 0.30)
    kda_ratio = random.uniform(0.8, 3.5)
    heroes_data.append((hero_id, hero_name, win_rate, pick_rate, kda_ratio))

# Schema explicite pour eviter l'inference de types
schema = StructType([
    StructField("hero_id", LongType(), False),
    StructField("hero_name", StringType(), False),
    StructField("win_rate", DoubleType(), False),
    StructField("pick_rate", DoubleType(), False),
    StructField("kda_ratio", DoubleType(), False),
])

df_heroes = spark.createDataFrame(heroes_data, schema=schema)
print(f"Donnees chargees : {df_heroes.count()} heros")
df_heroes.show(5)

# Assemblage des features dans un seul vecteur
print("\nAssemblage et normalisation des features...")
assembler = VectorAssembler(
    inputCols=["win_rate", "pick_rate", "kda_ratio"],
    outputCol="raw_features",
)
df_assembled = assembler.transform(df_heroes)

# Normalisation : centrage + reduction
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withMean=True,
    withStd=True,
)
scaler_model = scaler.fit(df_assembled)
df_features = scaler_model.transform(df_assembled)

# Cache : indispensable pour une charge iterative
df_features.cache()
print(f"Features pretes et en cache : {df_features.count()} lignes")
print(f"Nombre de partitions du DataFrame : {df_features.rdd.getNumPartitions()}")
Generation des donnees - Stats de heros Esports (Track A)...
Donnees chargees : 500 heros
+-------+---------+------------------+-------------------+------------------+
|hero_id|hero_name|          win_rate|          pick_rate|         kda_ratio|
+-------+---------+------------------+-------------------+------------------+
|      0|   hero_0|0.5418280395373651|0.01725311901457341| 1.542579159596622|
|      1|   hero_1|0.4169632214446468|0.22357665210756358|2.6270886160418607|
|      2|   hero_2|0.6176538703114536|0.03521226146253068|1.9391889131502302|
|      3|   hero_3|0.3589391658314211|0.07340501269304496|2.1644592778790788|
|      4|   hero_4|0.3579607909051591|0.06766291869912806|2.5546879820047126|
+-------+---------+------------------+-------------------+------------------+
only showing top 5 rows

Assemblage et normalisation des features...
Features pretes et en cache : 500 lignes
Nombre de partitions du DataFrame : 8

B.2 — Balayages KMeans / BisectingKMeans¶

Objectif : comparer les performances de KMeans et BisectingKMeans pour différentes valeurs de k.

  • k ∈ {4, 6, 8, 10}
  • Mesures : score Silhouette, temps écoulé (ms), bytes de shuffle (lecture/écriture)
  • Tous les résultats sont stockés dans sweep_results pour l'export CSV final
In [8]:
print("=== Balayage KMeans et BisectingKMeans ===\n")

# Evaluateur silhouette : mesure la qualite des clusters
evaluator = ClusteringEvaluator(
    predictionCol="prediction",
    featuresCol="features",
    metricName="silhouette",
)


def silhouette_safe(predictions_df, label=""):
    """Calcule la silhouette en se protegeant du cas ou le modele
    a collapse tous les points dans un seul cluster (Spark plante sinon)."""
    n_clusters_real = predictions_df.select("prediction").distinct().count()
    if n_clusters_real >= 2:
        return evaluator.evaluate(predictions_df), n_clusters_real
    print(f"    [warn] {label} a produit seulement {n_clusters_real} cluster(s) - silhouette=NaN")
    return float("nan"), n_clusters_real


# Liste qui collecte tous les resultats du balayage
sweep_results = []
timestamp_run = time.strftime("%Y-%m-%d %H:%M:%S")

# Configurations a tester
k_values = [4, 6, 8, 10]
algorithms = [("KMeans", KMeans), ("BisectingKMeans", BisectingKMeans)]

for algo_name, algo_cls in algorithms:
    print(f"\n--- {algo_name} ---")
    for k in k_values:
        # Mesure du shuffle avant l'operation
        shuffle_before = cumul_shuffle_bytes()
        t_start = time.time()

        # Entrainement du modele (maxIter=20 pour limiter le risque de collapse)
        model = algo_cls(k=k, featuresCol="features", seed=42, maxIter=20)
        model_trained = model.fit(df_features)

        # Predictions et evaluation (avec garde-fou)
        predictions = model_trained.transform(df_features)
        silhouette_score, n_clusters_real = silhouette_safe(
            predictions, label=f"{algo_name} k={k}"
        )

        elapsed_ms = (time.time() - t_start) * 1000
        shuffle_read, shuffle_write = delta_shuffle(shuffle_before)

        sweep_results.append({
            "run_id": f"sweep_{algo_name}_k{k}",
            "path": "B",
            "algorithm": algo_name,
            "task": "kmeans_sweep",
            "notes": f"sweep k={k} clusters_reels={n_clusters_real}",
            "iteration_or_k": k,
            "seed": 42,
            "metric_value": silhouette_score,
            "shuffle_read_bytes": shuffle_read,
            "shuffle_write_bytes": shuffle_write,
            "elapsed_ms": elapsed_ms,
            "timestamp": timestamp_run,
        })

        sil_str = f"{silhouette_score:.4f}" if silhouette_score == silhouette_score else "NaN"
        print(f"  k={k:2d} : silhouette={sil_str}, "
              f"temps={elapsed_ms:8.1f} ms, shuffle_r={shuffle_read} B, shuffle_w={shuffle_write} B")

print(f"\nBalayage termine : {len(sweep_results)} configurations testees")

# Identification du meilleur k (Silhouette la plus elevee), en ignorant les NaN
valid_results = [r for r in sweep_results if r["metric_value"] == r["metric_value"]]
if not valid_results:
    raise RuntimeError("Aucune configuration valide - tous les modeles ont collapse.")

best_config = max(valid_results, key=lambda r: r["metric_value"])
print(f"Meilleure config : {best_config['algorithm']} k={best_config['iteration_or_k']} "
      f"silhouette={best_config['metric_value']:.4f}")
=== Balayage KMeans et BisectingKMeans ===


--- KMeans ---
  k= 4 : silhouette=0.3971, temps=  2073.1 ms, shuffle_r=113095 B, shuffle_w=113095 B
  k= 6 : silhouette=0.4635, temps=  1801.3 ms, shuffle_r=180056 B, shuffle_w=180056 B
  k= 8 : silhouette=0.4489, temps=  1713.7 ms, shuffle_r=235986 B, shuffle_w=235986 B
  k=10 : silhouette=0.4292, temps=  2100.0 ms, shuffle_r=359729 B, shuffle_w=359729 B

--- BisectingKMeans ---
    [warn] BisectingKMeans k=4 a produit seulement 1 cluster(s) - silhouette=NaN
  k= 4 : silhouette=NaN, temps=  3992.6 ms, shuffle_r=230888 B, shuffle_w=230888 B
    [warn] BisectingKMeans k=6 a produit seulement 1 cluster(s) - silhouette=NaN
  k= 6 : silhouette=NaN, temps=  5847.5 ms, shuffle_r=381648 B, shuffle_w=381648 B
    [warn] BisectingKMeans k=8 a produit seulement 1 cluster(s) - silhouette=NaN
  k= 8 : silhouette=NaN, temps=  7146.2 ms, shuffle_r=532408 B, shuffle_w=532408 B
    [warn] BisectingKMeans k=10 a produit seulement 1 cluster(s) - silhouette=NaN
  k=10 : silhouette=NaN, temps=  7790.7 ms, shuffle_r=-749149 B, shuffle_w=-752918 B

Balayage termine : 8 configurations testees
Meilleure config : KMeans k=6 silhouette=0.4635

B.3 — Analyse de stabilité des seeds¶

Objectif : vérifier que le score Silhouette est stable lorsqu'on change la graine aléatoire (seed) du KMeans.

  • On prend le meilleur k trouvé en B.2
  • On lance 5 seeds différentes (42 → 46)
  • On rapporte la moyenne ± écart-type du score Silhouette
In [9]:
print("\n=== Analyse de stabilite des seeds ===")

# On reprend le meilleur k identifie en B.2
best_k = best_config["iteration_or_k"]
best_algo = best_config["algorithm"]
print(f"Meilleur k retenu : {best_k} (algorithme : {best_algo}) - test sur 5 seeds")

silhouette_scores = []
seed_results = []

for seed_num in range(42, 47):  # 5 seeds : 42, 43, 44, 45, 46
    shuffle_before = cumul_shuffle_bytes()
    t_start = time.time()

    # On reutilise le meme algorithme que la meilleure config
    if best_algo == "KMeans":
        model = KMeans(k=best_k, featuresCol="features", seed=seed_num, maxIter=20)
    else:
        model = BisectingKMeans(k=best_k, featuresCol="features", seed=seed_num, maxIter=20)

    model_trained = model.fit(df_features)
    predictions = model_trained.transform(df_features)

    sil_score, n_clusters_real = silhouette_safe(
        predictions, label=f"{best_algo} seed={seed_num}"
    )
    if sil_score == sil_score:  # NaN check
        silhouette_scores.append(sil_score)

    elapsed_ms = (time.time() - t_start) * 1000
    shuffle_read, shuffle_write = delta_shuffle(shuffle_before)

    seed_results.append({
        "run_id": f"stability_seed{seed_num}",
        "path": "B",
        "algorithm": best_algo,
        "task": "seed_stability",
        "notes": f"k={best_k} seed={seed_num} clusters_reels={n_clusters_real}",
        "iteration_or_k": best_k,
        "seed": seed_num,
        "metric_value": sil_score,
        "shuffle_read_bytes": shuffle_read,
        "shuffle_write_bytes": shuffle_write,
        "elapsed_ms": elapsed_ms,
        "timestamp": timestamp_run,
    })

    sil_str = f"{sil_score:.4f}" if sil_score == sil_score else "NaN"
    print(f"  Seed {seed_num} : silhouette={sil_str}, temps={elapsed_ms:.1f} ms")

# Statistiques de stabilite (sur les seeds valides uniquement)
import statistics
if silhouette_scores:
    mean_sil = statistics.mean(silhouette_scores)
    stdev_sil = statistics.stdev(silhouette_scores) if len(silhouette_scores) > 1 else 0.0
    print(f"\nResultats de stabilite (k={best_k}) sur {len(silhouette_scores)} seeds valides :")
    print(f"  Moyenne Silhouette : {mean_sil:.4f}")
    print(f"  Ecart-type        : {stdev_sil:.4f}")
    print(f"  Min : {min(silhouette_scores):.4f}, Max : {max(silhouette_scores):.4f}")
else:
    mean_sil = float("nan")
    stdev_sil = float("nan")
    print("\nAucune seed n'a produit de score valide.")
=== Analyse de stabilite des seeds ===
Meilleur k retenu : 6 (algorithme : KMeans) - test sur 5 seeds
  Seed 42 : silhouette=0.4635, temps=1166.9 ms
  Seed 43 : silhouette=0.4780, temps=1081.8 ms
  Seed 44 : silhouette=0.4775, temps=973.4 ms
  Seed 45 : silhouette=0.4629, temps=1275.2 ms
  Seed 46 : silhouette=0.4633, temps=1311.5 ms

Resultats de stabilite (k=6) sur 5 seeds valides :
  Moyenne Silhouette : 0.4691
  Ecart-type        : 0.0080
  Min : 0.4629, Max : 0.4780

B.4 — Expérience de partitionnement¶

Objectif : mesurer l'impact du nombre de partitions sur les coûts de shuffle et le temps de fit.

  • On exécute le meilleur k avec différentes stratégies de partitionnement
  • Configurations testées : 1, 2, 4, 8, 16 partitions (via repartition)
  • On compare temps écoulé et bytes de shuffle
In [10]:
print("\n=== Experience de partitionnement ===")

partition_results = []
partition_counts = [1, 2, 4, 8, 16]

for n_parts in partition_counts:
    # Re-partitionnement du DataFrame de features
    df_repart = df_features.repartition(n_parts)
    df_repart.cache()
    df_repart.count()  # force la materialisation
    n_parts_real = df_repart.rdd.getNumPartitions()

    shuffle_before = cumul_shuffle_bytes()
    t_start = time.time()

    if best_algo == "KMeans":
        model = KMeans(k=best_k, featuresCol="features", seed=42, maxIter=20)
    else:
        model = BisectingKMeans(k=best_k, featuresCol="features", seed=42, maxIter=20)

    model_trained = model.fit(df_repart)
    predictions = model_trained.transform(df_repart)
    sil_score, n_clusters_real = silhouette_safe(
        predictions, label=f"{best_algo} part={n_parts_real}"
    )

    elapsed_ms = (time.time() - t_start) * 1000
    shuffle_read, shuffle_write = delta_shuffle(shuffle_before)

    partition_results.append({
        "run_id": f"partition_n{n_parts}",
        "path": "B",
        "algorithm": best_algo,
        "task": "partitioning",
        "notes": f"repartition={n_parts_real} clusters_reels={n_clusters_real}",
        "iteration_or_k": best_k,
        "seed": 42,
        "metric_value": sil_score,
        "shuffle_read_bytes": shuffle_read,
        "shuffle_write_bytes": shuffle_write,
        "elapsed_ms": elapsed_ms,
        "timestamp": timestamp_run,
    })

    sil_str = f"{sil_score:.4f}" if sil_score == sil_score else "NaN"
    print(f"  partitions={n_parts_real:2d} : silhouette={sil_str}, "
          f"temps={elapsed_ms:8.1f} ms, shuffle_r={shuffle_read} B, shuffle_w={shuffle_write} B")

    df_repart.unpersist()

# Resume comparatif
print("\nResume avant/apres :")
default_run = next((r for r in sweep_results
                    if r["algorithm"] == best_algo and r["iteration_or_k"] == best_k), None)
if default_run:
    print(f"  Reference (sweep)      : temps={default_run['elapsed_ms']:.1f} ms, "
          f"shuffle_w={default_run['shuffle_write_bytes']} B")
fastest = min(partition_results, key=lambda r: r["elapsed_ms"])
print(f"  Plus rapide repartition : {fastest['notes']}, "
      f"temps={fastest['elapsed_ms']:.1f} ms, shuffle_w={fastest['shuffle_write_bytes']} B")
=== Experience de partitionnement ===
  partitions= 1 : silhouette=0.4532, temps=  1280.8 ms, shuffle_r=12125 B, shuffle_w=12125 B
  partitions= 2 : silhouette=0.4785, temps=   966.1 ms, shuffle_r=27445 B, shuffle_w=27445 B
  partitions= 4 : silhouette=0.4652, temps=   978.9 ms, shuffle_r=-24311 B, shuffle_w=-28080 B
  partitions= 8 : silhouette=0.4725, temps=  1439.5 ms, shuffle_r=186387 B, shuffle_w=190156 B
  partitions=16 : silhouette=0.4770, temps=  1568.8 ms, shuffle_r=187965 B, shuffle_w=187965 B

Resume avant/apres :
  Reference (sweep)      : temps=1801.3 ms, shuffle_w=180056 B
  Plus rapide repartition : repartition=2 clusters_reels=6, temps=966.1 ms, shuffle_w=27445 B

Évidence partagée — Plans et métriques¶

Objectifs :

  1. Sauvegarder le plan d'exécution Spark du fit KMeans (preuve d'optimisation)
  2. Écrire toutes les métriques collectées dans lab3_metrics_log.csv
  3. Conserver une copie du résumé textuel dans proof/
In [11]:
# 1) Sauvegarde du plan d'execution du KMeans sur le meilleur k
print("Sauvegarde du plan d'execution...")

if best_algo == "KMeans":
    model_final = KMeans(k=best_k, featuresCol="features", seed=42, maxIter=20).fit(df_features)
else:
    model_final = BisectingKMeans(k=best_k, featuresCol="features", seed=42, maxIter=20).fit(df_features)

predictions_final = model_final.transform(df_features)

# explain() ecrit sur stdout - on capture la sortie via _jdf.queryExecution
plan_text = predictions_final._jdf.queryExecution().toString()
plan_path = pathlib.Path("proof/plan_iterative.txt")
plan_path.write_text(plan_text)
print(f"  Plan sauvegarde : {plan_path}  ({plan_path.stat().st_size} octets)")

# 2) Ecriture du CSV des metriques
print("\nEcriture de lab3_metrics_log.csv...")

header = [
    "run_id", "path", "algorithm", "task", "notes",
    "iteration_or_k", "seed", "metric_value",
    "shuffle_read_bytes", "shuffle_write_bytes",
    "elapsed_ms", "timestamp",
]

all_rows = sweep_results + seed_results + partition_results

with open("lab3_metrics_log.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=header)
    writer.writeheader()
    for row in all_rows:
        writer.writerow(row)

print(f"  CSV ecrit : lab3_metrics_log.csv  ({len(all_rows)} lignes)")

# 3) Resume textuel pour le rapport
def fmt(x):
    return f"{x:.4f}" if isinstance(x, float) and x == x else "NaN"

summary_path = pathlib.Path("proof/summary.txt")
summary_lines = [
    f"Lab 3 - Resume - {timestamp_run}",
    f"Meilleure config sweep : {best_algo} k={best_k} silhouette={fmt(best_config['metric_value'])}",
    f"Stabilite ({best_algo} k={best_k}) : moyenne={fmt(mean_sil)}, ecart-type={fmt(stdev_sil)}",
    f"Partition la plus rapide : {fastest['notes']} ({fastest['elapsed_ms']:.1f} ms)",
    f"Total de runs enregistres : {len(all_rows)}",
]
summary_path.write_text("\n".join(summary_lines))
print(f"  Resume sauvegarde : {summary_path}")
print("\nTous les artefacts sont prets pour le rapport.")
Sauvegarde du plan d'execution...
  Plan sauvegarde : proof/plan_iterative.txt  (3267 octets)

Ecriture de lab3_metrics_log.csv...
  CSV ecrit : lab3_metrics_log.csv  (18 lignes)
  Resume sauvegarde : proof/summary.txt

Tous les artefacts sont prets pour le rapport.

Nettoyage¶

Libération du cache et arrêt de la session Spark.

In [12]:
#df_features.unpersist()
#spark.stop()
#print("Session Spark arretee. Lab 3 termine.")