DE2 — Cahier de Projet Final¶

Author : Badr TAJINI - Data Engineering II - ESIEE 2025-2026


Students : DIALLO Samba & DIOP Mouhamed


Ceci est l'artefact exécutable principal. Configurez les chemins, lancez le pipeline complet (Batch ETL → Streaming → Traitement de Texte → Itératif → Préparation LLM), et enregistrez les preuves.

Étape 0 : Configuration & Initialisation de Spark¶

Avant de commencer tout traitement, nous chargeons nos paramètres centraux depuis de2_project_config.yml. Cela nous permet de garder les chemins, les SLOs (objectifs de niveau de service) et les configurations en dehors du code. Nous initialisons également la SparkSession en activant l'exécution adaptative des requêtes (AQE).

In [1]:
# ==================================================================
# 0. Charger la configuration et initialiser Spark
# ==================================================================
import yaml, pathlib, datetime, time, json
import os
from pyspark.sql import SparkSession, functions as F, types as T

with open("de2_project_config.yml", encoding="utf-8") as f:
    CFG = yaml.safe_load(f)

spark = SparkSession.builder \
    .appName(CFG["spark"]["app_name"]) \
    .master(CFG["spark"]["master"]) \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("Spark:", spark.version)
print("UI:", spark.sparkContext.uiWebUrl)
print("Configuration chargée avec succès.")
WARNING: Using incubator modules: jdk.incubator.vector
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark: 4.0.1
UI: http://10.192.33.105:4040
Configuration chargée avec succès.

Phase 1: Pipeline Batch ETL & Architecture Medallion¶

Cette phase ingère les données brutes de GitHub Archive, les traite à travers les couches Bronze, Silver et Gold, et enregistre les métriques de performance du pipeline.

1.1 Couche Bronze (Ingestion Brute)¶

La couche Bronze est responsable de l'ingestion des données brutes sans altérer la structure de base. Nous lisons les fichiers .json.gz depuis notre dossier archive/, nous ajoutons une colonne ingestion_tstamp pour l'audit, et nous sauvegardons les données efficacement au format Parquet. Nous enregistrons également le temps d'exécution pour vérifier nos SLOs.

In [2]:
# 1.1 Couche Bronze (Ingestion Brute)
def process_bronze():
    print("Démarrage du traitement de la couche Bronze...")
    t0 = time.time()
    # Lire les données JSON brutes
    raw_df = spark.read.json(CFG["paths"]["raw_csv_glob"])
    
    # Ajouter les métadonnées d'ingestion
    bronze_df = raw_df.withColumn("ingestion_tstamp", F.current_timestamp())
    
    # Sauvegarder au format Parquet
    bronze_df.write.mode("overwrite").parquet(CFG["paths"]["bronze"])
    
    t1 = time.time()
    record_metric("Batch ETL", "bronze_latency_sec", t1 - t0, f"Traitement de {bronze_df.count()} évènements bruts")
    print(f"Couche Bronze terminée en {t1 - t0:.2f} secondes.")
    return bronze_df

def record_metric(stage, metric_name, metric_value, notes=""):
    timestamp = datetime.datetime.now().isoformat()
    run_id = f"run_{int(time.time())}"
    row = f"{run_id},{stage},task,{metric_name},{metric_value},{notes},{timestamp}\n"
    with open(CFG["paths"]["metrics_log"], "a", encoding="utf-8") as f:
        f.write(row)

1.2 Couche Silver (Nettoyage & Application du Schéma)¶

La couche Silver applique des validations strictes sur le schéma et aplatit les données. Nous filtrons les enregistrements malformés (ex: id manquant), extrayons les éléments nécessaires depuis les JSON imbriqués (comme actor.login, repo.name), convertissons les chaînes de caractères en types timestamp/date, et écrivons le résultat partitionné par date pour optimiser les requêtes futures.

In [3]:
# 1.2 Couche Silver (Nettoyage et Application du Schéma)
def process_silver():
    print("Démarrage du traitement de la couche Silver...")
    import time
    t0 = time.time()
    bronze_df = spark.read.parquet(CFG["paths"]["bronze"])
    
    # Filtrer les identifiants vides et sélectionner les colonnes importantes
    # Extraction des structures JSON imbriquées
    silver_df = bronze_df.filter(F.col("id").isNotNull()) \
        .select(
            F.col("id").alias("event_id"),
            F.col("type").alias("event_type"),
            F.col("actor.login").alias("actor_login"),
            F.col("repo.name").alias("repo_name"),
            F.col("created_at").cast("timestamp").alias("created_at"),
            F.to_date(F.col("created_at")).alias("date"),
            F.col("payload.commits").alias("commits"),
            F.coalesce(F.col("payload.pull_request.body"), F.col("payload.pull_request.title")).alias("pr_text")
        )
    
    # Partitionner par date et sauvegarder
    silver_df.write.mode("overwrite") \
        .partitionBy(*CFG["layout"]["partition_by"]) \
        .parquet(CFG["paths"]["silver"])
    
    t1 = time.time()
    record_metric("Batch ETL", "silver_latency_sec", t1 - t0, f"Nettoyage de {silver_df.count()} évènements")
    print(f"Couche Silver terminée en {t1 - t0:.2f} secondes.")
    return silver_df

1.3 Couche Gold (Agrégations Analytiques)¶

La couche Gold crée des datasets agrégés prêts à être consommés par des tableaux de bord analytiques ou des outils de BI. Ici, nous calculons le volume quotidien de chaque type d'événement par dépôt de code (repo_activity). Cela réduit considérablement la taille des données et optimise la vitesse des rapports.

In [4]:
# 1.3 Couche Gold (Agrégations)
def process_gold():
    print("Démarrage du traitement de la couche Gold...")
    t0 = time.time()
    silver_df = spark.read.parquet(CFG["paths"]["silver"])
    
    # Agréger les événements par dépôt et par jour
    gold_repo_activity = silver_df.groupBy("date", "repo_name", "event_type") \
        .agg(F.count("event_id").alias("event_count"))
        
    gold_repo_activity.write.mode("overwrite").parquet(os.path.join(CFG["paths"]["gold"], "repo_activity"))
    
    t1 = time.time()
    record_metric("Batch ETL", "gold_latency_sec", t1 - t0, f"Agrégation des dépôts réussie")
    print(f"Couche Gold terminée en {t1 - t0:.2f} secondes.")
    return gold_repo_activity

Exécution du Pipeline Batch (Phase 1)¶

Il est temps de déclencher le traitement réel des données. Décommentez les appels de fonctions ci-dessous pour lancer le pipeline de bout en bout sur vos données GitHub Archive locales.

In [5]:
# Exécuter le Pipeline de la Phase 1
# À décommenter et exécuter après le téléchargement via download_gh_archive.py
process_bronze()
process_silver()
process_gold()
Démarrage du traitement de la couche Bronze...
                                                                                
Couche Bronze terminée en 43.05 secondes.
Démarrage du traitement de la couche Silver...
                                                                                
Couche Silver terminée en 4.77 secondes.
Démarrage du traitement de la couche Gold...
[Stage 11:===========================================>              (6 + 2) / 8]
Couche Gold terminée en 1.71 secondes.
                                                                                
Out[5]:
DataFrame[date: date, repo_name: string, event_type: string, event_count: bigint]

Phase 2 : Ingestion Streaming (Structured Streaming)¶

Cette phase implémente l'ingestion en continu via Structured Streaming avec une source fichier. Le script simulate_streaming.py dépose les fichiers .json.gz un par un dans data/landing/. Spark les détecte automatiquement et les traite comme des micro-batches.

Composants clés :

  • readStream avec maxFilesPerTrigger=1 (1 fichier par micro-batch)
  • Agrégation fenêtrée avec watermark (retard toléré de 10 min)
  • Fenêtre temporelle de 5 minutes
  • Écriture en mode append vers Parquet

2.1 Schéma de lecture pour le streaming¶

En streaming, Spark ne peut pas inférer le schéma automatiquement. On définit donc un schéma explicite basé sur les champs principaux de GitHub Archive.

In [6]:
# 2.1 Schéma explicite pour le streaming
# En mode streaming, Spark ne peut pas inférer le schéma à la volée.
# On le définit manuellement à partir des champs GitHub Archive.
from pyspark.sql.types import StructType, StructField, StringType, LongType

streaming_schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("actor", StructType([
        StructField("login", StringType(), True),
    ]), True),
    StructField("repo", StructType([
        StructField("name", StringType(), True),
    ]), True),
    StructField("created_at", StringType(), True),
])

print("Schéma de streaming défini.")
Schéma de streaming défini.

2.2 Lancement du flux Structured Streaming¶

On lit les fichiers JSON déposés dans data/landing/ avec readStream. On applique une agrégation fenêtrée avec un watermark.

IMPORTANT : Avant d'exécuter cette cellule, lancez simulate_streaming.py dans un terminal séparé :

python simulate_streaming.py
In [7]:
# 2.2 Démarrer le flux Structured Streaming
import shutil

# Nettoyer les sorties précédentes pour repartir de zéro
for d in [CFG["paths"]["streaming"], CFG["paths"]["streaming_checkpoint"]]:
    if os.path.exists(d):
        shutil.rmtree(d)

# Lire le flux depuis le dossier landing/ (source fichier)
# maxFilesPerTrigger=1 : on ne traite qu'un fichier par micro-batch
stream_df = spark.readStream \
    .schema(streaming_schema) \
    .option("maxFilesPerTrigger", 1) \
    .json(CFG["paths"]["streaming_landing"])

# Convertir created_at en timestamp pour le watermark
stream_typed = stream_df \
    .withColumn("event_ts", F.to_timestamp(F.col("created_at"))) \
    .filter(F.col("event_ts").isNotNull())

# Agrégation fenêtrée avec watermark :
# - Watermark : on tolère un retard avant de fermer une fenêtre
# - Fenêtre temporelle : on agrège les événements par tranches
windowed_counts = stream_typed \
    .withWatermark("event_ts", CFG["streaming"]["watermark"]) \
    .groupBy(
        F.window("event_ts", CFG["streaming"]["window_duration"]),
        F.col("type").alias("event_type")
    ) \
    .agg(F.count("*").alias("event_count"))

# Écriture en mode append vers Parquet avec checkpoint
streaming_query = windowed_counts.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", CFG["paths"]["streaming"]) \
    .option("checkpointLocation", CFG["paths"]["streaming_checkpoint"]) \
    .trigger(processingTime=CFG["streaming"]["trigger_interval"]) \
    .start()

wm = CFG['streaming']['watermark']
ti = CFG['streaming']['trigger_interval']
wd = CFG['streaming']['window_duration']
print("Streaming démarré. En attente des fichiers dans data/landing/ ...")
print(f"Trigger : {ti}  |  Watermark : {wm}  |  Fenêtre : {wd}")
Streaming démarré. En attente des fichiers dans data/landing/ ...
Trigger : 30 seconds  |  Watermark : 10 minutes  |  Fenêtre : 5 minutes
[Stage 15:==============>                                        (53 + 8) / 200]

2.3 Surveillance du flux et capture des métriques¶

On attend que le flux traite tous les fichiers disponibles, puis on capture query.lastProgress comme preuve du bon fonctionnement du streaming.

In [8]:
# 2.3 Attendre le traitement et capturer les preuves
import json as json_lib
from uuid import UUID

# Encodeur personnalisé pour gérer les UUID dans lastProgress
class SafeEncoder(json_lib.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, UUID):
            return str(obj)
        return super().default(obj)

# Attendre que le streaming traite les fichiers disponibles
WAIT_SECONDS = 90
print(f"Attente de {WAIT_SECONDS}s pour laisser le streaming traiter les fichiers...")
time.sleep(WAIT_SECONDS)

# Capturer query.lastProgress comme preuve
last_progress = streaming_query.lastProgress
if last_progress:
    print("=== query.lastProgress ===")
    print(json_lib.dumps(last_progress, indent=2, cls=SafeEncoder))
    
    # Sauvegarder la preuve dans proof/
    os.makedirs(CFG["paths"]["proof"], exist_ok=True)
    proof_path = os.path.join(CFG["paths"]["proof"], "streaming_lastProgress.json")
    with open(proof_path, "w", encoding="utf-8") as f:
        json_lib.dump(last_progress, f, indent=2, cls=SafeEncoder)
    print(f"Preuve sauvegardée : {proof_path}")
    
    # Enregistrer la métrique
    record_metric("Streaming", "streaming_trigger_latency",
                  last_progress.get("batchDuration", "N/A"),
                  f"Batch {last_progress.get('batchId', 'N/A')}")
else:
    print("Aucun batch traité. Vérifiez que simulate_streaming.py a déposé des fichiers.")

# Arrêter proprement le flux
streaming_query.stop()
print("Streaming arrêté proprement.")

# Vérifier les résultats écrits
result_df = spark.read.parquet(CFG["paths"]["streaming"])
print(f"\nNombre de lignes dans la sortie streaming : {result_df.count()}")
result_df.orderBy("window").show(10, truncate=False)
Attente de 90s pour laisser le streaming traiter les fichiers...
                                                                                
=== query.lastProgress ===
{
  "id": "830cbb22-c1d7-4fce-9cda-3a82daff144b",
  "runId": "176da210-a6dc-4419-aafb-2c82de48e2be",
  "name": null,
  "timestamp": "2026-05-14T23:42:00.000Z",
  "batchId": 3,
  "batchDuration": 5,
  "durationMs": {
    "triggerExecution": 5,
    "latestOffset": 4
  },
  "eventTime": {
    "watermark": "2024-01-01T01:49:59.000Z"
  },
  "stateOperators": [
    {
      "operatorName": "stateStoreSave",
      "numRowsTotal": 44,
      "numRowsUpdated": 0,
      "numRowsRemoved": 180,
      "allUpdatesTimeMs": 449,
      "allRemovalsTimeMs": 3698,
      "commitTimeMs": 18876,
      "memoryUsedBytes": 146248,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 200,
      "numStateStoreInstances": 200,
      "customMetrics": {
        "stateOnCurrentVersionSizeBytes": 36000,
        "loadedMapCacheHitCount": 800,
        "loadedMapCacheMissCount": 0
      }
    }
  ],
  "sources": [
    {
      "description": "FileStreamSource[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/project final/data/landing]",
      "startOffset": "{\"logOffset\":1}",
      "endOffset": "{\"logOffset\":1}",
      "latestOffset": "None",
      "numInputRows": 0,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 0.0,
      "metrics": {}
    }
  ],
  "sink": {
    "description": "FileSink[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/project final/outputs/project/streaming]",
    "numOutputRows": -1,
    "metrics": {}
  },
  "numInputRows": 0,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 0.0,
  "observedMetrics": {}
}
Preuve sauvegardée : proof/streaming_lastProgress.json
Streaming arrêté proprement.

Nombre de lignes dans la sortie streaming : 315
+------------------------------------------+-----------------------------+-----------+
|window                                    |event_type                   |event_count|
+------------------------------------------+-----------------------------+-----------+
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|ReleaseEvent                 |53         |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|IssuesEvent                  |124        |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|PullRequestReviewCommentEvent|74         |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|IssueCommentEvent            |447        |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|PullRequestEvent             |625        |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|ForkEvent                    |51         |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|PullRequestReviewEvent       |73         |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|PushEvent                    |7807       |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|GollumEvent                  |3          |
|{2024-01-01 01:00:00, 2024-01-01 01:05:00}|CreateEvent                  |860        |
+------------------------------------------+-----------------------------+-----------+
only showing top 10 rows

Phase 3 : Pipeline Texte — Index Inversé¶

Extraction des messages de commit (PushEvent), tokenisation, normalisation, et construction d'un index inversé. Cela permet de mesurer la latence de requête et l'empreinte disque.

In [9]:
# 3.1 Extraction du corpus texte (messages de commit)
t0_text = time.time()
silver_df = spark.read.parquet(CFG["paths"]["silver"])

# Extraire les messages de commit depuis le tableau 'commits'
corpus = silver_df.filter(F.col("event_type") == "PushEvent") \
    .withColumn("commit", F.explode("commits")) \
    .select(
        F.col("commit.sha").alias("doc_id"),
        F.col("commit.message").alias("text")
    ).filter(F.col("text").isNotNull() & (F.length("text") > 0))

print(f"Documents dans le corpus : {corpus.count()}")
corpus.show(5)
Documents dans le corpus : 279340
+--------------------+--------------------+
|              doc_id|                text|
+--------------------+--------------------+
|6e1ffead88ea16bf5...|Adds a seeking sl...|
|e46cd0660180ad44f...|Direwolf file upd...|
|0aefb366d74506538...|Deploy to GitHub ...|
|b4a1c82638a3fada6...|        Empty Commit|
|fb7454a1e1b6971ae...|Deploy to GitHub ...|
+--------------------+--------------------+
only showing top 5 rows
In [10]:
# 3.2 Tokenisation, normalisation, suppression des stop-words
stop_words = {"the","a","an","is","it","to","in","of","and","for","on","with","as","by","at","this","that","from"}

tokens = corpus.withColumn("text_clean", F.regexp_replace(F.lower(F.col("text")), r"[^a-z0-9\s]", " ")) \
    .withColumn("tokens", F.split("text_clean", r"\s+")) \
    .select("doc_id", F.explode("tokens").alias("token")) \
    .filter((F.length("token") > 1) & (~F.col("token").isin(stop_words)))

print(f"Tokens apres nettoyage : {tokens.count()}")
[Stage 31:===========================================>              (6 + 2) / 8]
Tokens apres nettoyage : 2194933
                                                                                
In [11]:
# 3.3 Construction de l'index inverse
inverted_index = tokens.groupBy("token") \
    .agg(
        F.collect_list("doc_id").alias("doc_ids"),
        F.count("doc_id").alias("freq")
    ).orderBy(F.desc("freq"))

inverted_index.write.mode("overwrite").parquet(CFG["paths"]["text"])
print(f"Termes uniques dans l'index : {inverted_index.count()}")
[Stage 42:==================================================>       (7 + 1) / 8]
Termes uniques dans l'index : 68328
                                                                                
In [12]:
# 3.4 Mesure de latence de requete (SLO <= 2s)
idx = spark.read.parquet(CFG["paths"]["text"])
idx.cache()
idx.count() # Force l'evaluation

query_terms = CFG["text"]["query_terms"]
for term in query_terms:
    t0_q = time.time()
    res = idx.filter(F.col("token") == term).collect()
    t_ms = (time.time() - t0_q) * 1000
    docs = len(res[0]['doc_ids']) if res else 0
    record_metric("Text", f"query_latency_{term}_ms", t_ms, f"Found {docs} docs")
    print(f"Requete '{term}': {t_ms:.1f} ms (Trouve dans {docs} documents)")
                                                                                
Requete 'fix': 344.0 ms (Trouve dans 24128 documents)
Requete 'bug': 86.9 ms (Trouve dans 1708 documents)
Requete 'feature': 64.1 ms (Trouve dans 767 documents)
In [13]:
# 3.5 Comparaison empreinte Parquet vs CSV
import pathlib
import os

# Sauvegarder aussi en CSV pour comparer (CSV ne supporte pas les arrays, on les convertit en string)
csv_path = CFG["paths"]["text"] + "_csv"
inverted_index.withColumn("doc_ids", F.concat_ws(",", "doc_ids")).write.mode("overwrite").option("header", "true").csv(csv_path)

def get_size(path):
    return sum(f.stat().st_size for f in pathlib.Path(path).glob('**/*') if f.is_file())

size_parquet = get_size(CFG["paths"]["text"])
size_csv = get_size(csv_path)
ratio = (size_parquet / size_csv) * 100 if size_csv > 0 else 0

record_metric("Text", "storage_ratio_pct", ratio, "Parquet vs CSV")
print(f"Taille CSV     : {size_csv / 1024 / 1024:.2f} MB")
print(f"Taille Parquet : {size_parquet / 1024 / 1024:.2f} MB")
print(f"Ratio Parquet/CSV : {ratio:.1f}% (SLO <= 60%)")
                                                                                
Taille CSV     : 87.45 MB
Taille Parquet : 46.85 MB
Ratio Parquet/CSV : 53.6% (SLO <= 60%)
                                                                                

Phase 4 : Charge Itérative — Graphe & PageRank¶

Construction d'un graphe des interactions (Développeur → Dépôt) et calcul du PageRank pour identifier les dépôts les plus influents. L'algorithme tourne via des jointures itératives, ce qui permet de mesurer le coût du shuffle et la convergence à chaque étape.

In [14]:
# 4.1 Construction du graphe (Acteur -> Dépôt)
# On s'intéresse aux événements de contribution (Push, PR, Issues)
contribution_events = ["PushEvent", "PullRequestEvent", "IssuesEvent"]

edges = silver_df.filter(F.col("event_type").isin(contribution_events)) \
    .select(
        F.col("actor_login").alias("src"),
        F.col("repo_name").alias("dst")
    ).distinct().cache()

# Calculer le degré sortant (out-degree) de chaque acteur
out_degree = edges.groupBy("src").count().withColumnRenamed("count", "out_deg").cache()

print(f"Nombre d'arêtes (interactions uniques) : {edges.count()}")
[Stage 66:===================================================>  (189 + 9) / 200]
Nombre d'arêtes (interactions uniques) : 98857
                                                                                
In [16]:
# 4.2 PageRank Itératif
t0_pr = time.time()
MAX_ITER = CFG.get("pagerank", {}).get("max_iter", 10)
DAMPING = CFG.get("pagerank", {}).get("damping", 0.85)

# IMPORTANT : Configurer un répertoire de checkpoint pour éviter l'erreur "Java heap space OOM"
import tempfile
spark.sparkContext.setCheckpointDir(tempfile.gettempdir() + "/spark_checkpoint")

# Nœuds uniques (acteurs et dépôts combinés)
vertices = edges.select("src").union(edges.select(F.col("dst").alias("src"))).distinct().localCheckpoint()
N = vertices.count()

# Initialisation : chaque nœud commence avec un rang de 1/N
ranks = vertices.withColumn("rank", F.lit(1.0 / N)).localCheckpoint()

for i in range(MAX_ITER):
    # 1. Calculer les contributions
    contribs = edges.join(ranks, "src") \
        .join(out_degree, "src") \
        .select(F.col("dst").alias("src"), (F.col("rank") / F.col("out_deg")).alias("contrib"))
    
    # 2. Mettre à jour les rangs pour TOUS les nœuds (left_outer avec vertices)
    sum_contribs = contribs.groupBy("src").agg(F.sum("contrib").alias("sum_contrib"))
    
    new_ranks = vertices.join(sum_contribs, "src", "left_outer") \
        .select("src", (F.lit(1 - DAMPING) / N + DAMPING * F.coalesce(F.col("sum_contrib"), F.lit(0.0))).alias("rank"))
    
    # 3. Couper l'historique (lineage) pour éviter le crash mémoire !
    new_ranks = new_ranks.localCheckpoint()
    
    # 4. Calculer la convergence (différence absolue totale)
    # Utilisation d'alias ("old" et "new") pour éviter l'erreur "Ambiguous column"
    delta = ranks.alias("old").join(new_ranks.alias("new"), "src", "full_outer") \
        .select(F.abs(F.coalesce(F.col("old.rank"), F.lit(0)) - F.coalesce(F.col("new.rank"), F.lit(0))).alias("diff")) \
        .agg(F.sum("diff")).collect()[0][0]
    
    # FIX: Gérer le cas où delta renvoie un Null SQL
    delta_val = delta if delta is not None else 0.0
    
    record_metric("Iterative", f"pagerank_delta", delta_val, f"Iteration {i+1}")
    print(f"Itération {i+1} - Delta : {delta_val:.6f}")
    
    ranks = new_ranks

# Sauvegarder les résultats dans le dossier Gold
pr_output = os.path.join(CFG["paths"]["gold"], "pagerank")
ranks.orderBy(F.desc("rank")).write.mode("overwrite").parquet(pr_output)

t1_pr = time.time()
record_metric("Iterative", "pagerank_latency_sec", t1_pr - t0_pr, f"{MAX_ITER} itérations")
print(f"PageRank terminé en {t1_pr - t0_pr:.2f} secondes. Top 10 influents :")
ranks.orderBy(F.desc("rank")).show(10, truncate=False)
                                                                                
Itération 1 - Delta : 0.686658
                                                                                
Itération 2 - Delta : 0.142394
                                                                                
Itération 3 - Delta : 0.000000
                                                                                
Itération 4 - Delta : 0.000000
                                                                                
Itération 5 - Delta : 0.000000
                                                                                
Itération 6 - Delta : 0.000000
                                                                                
Itération 7 - Delta : 0.000000
                                                                                
Itération 8 - Delta : 0.000000
                                                                                
Itération 9 - Delta : 0.000000
                                                                                
Itération 10 - Delta : 0.000000
PageRank terminé en 25.80 secondes. Top 10 influents :
+-----------------------------------------+--------------------+
|src                                      |rank                |
+-----------------------------------------+--------------------+
|whosumdotcom/srchposts240101             |9.820296959065312E-6|
|whosumdotcom/aesposts240101              |9.820296959065312E-6|
|NixOS/nixpkgs                            |9.285233992833537E-6|
|leanprover-community/mathlib4            |6.609815260909344E-6|
|NovaSector/NovaSector                    |5.539672011651576E-6|
|prgrms-fe-devcourse/FEDC5_Owhat_Byunghyun|5.539654694857355E-6|
|Myoldmopar/EnergyPlusBuildResults        |5.539654694857355E-6|
|space-wizards/space-station-14           |5.00457441183136E-6 |
|cmss13-devs/cmss13                       |4.507731465815729E-6|
|Kas-tle/java2bedrock.sh                  |4.469511445599587E-6|
+-----------------------------------------+--------------------+
only showing top 10 rows
In [17]:
# 4.3 Expérience de partitionnement
# Test de la configuration "optimisée" en repartitionnant par hachage
t0_opt = time.time()
edges_opt = edges.repartition(CFG["spark"]["partitions"], "src").cache()
out_degree_opt = edges_opt.groupBy("src").count().withColumnRenamed("count", "out_deg").cache()

ranks_opt = vertices.withColumn("rank", F.lit(1.0 / N))

for i in range(2): # Juste 2 itérations pour le test de temps
    contribs = edges_opt.join(ranks_opt, "src") \
        .join(out_degree_opt, "src") \
        .select(F.col("dst").alias("src"), (F.col("rank") / F.col("out_deg")).alias("contrib"))
    
    ranks_opt = contribs.groupBy("src").agg(
        (F.lit(1 - DAMPING) / N + DAMPING * F.sum("contrib")).alias("rank")
    ).cache()
    ranks_opt.count() # Force execution

t1_opt = time.time()
record_metric("Iterative", "pagerank_opt_latency_sec", t1_opt - t0_opt, "2 itérations optimisées")
print(f"Temps avec partitionnement optimisé (2 iters) : {t1_opt - t0_opt:.2f}s")
                                                                                
Temps avec partitionnement optimisé (2 iters) : 2.87s

Phase 5 : Préparation LLM (Data Readiness)¶

Préparation d'un dataset "curaté" pour un LLM (RAG ou Fine-Tuning). On applique des filtres de qualité (longueur minimale, déduplication par hash).

In [18]:
# 5.1 Extraction et curation du dataset LLM
t0_llm = time.time()

# Combiner plusieurs sources de texte pour le dataset LLM
pr_text = silver_df.filter(F.col("event_type") == "PullRequestEvent") \
    .select(
        F.col("event_id").alias("doc_id"), 
        F.col("pr_text").alias("text")
    )

llm_corpus = corpus.unionByName(pr_text) # Push messages + PR payloads

min_len = CFG.get("llm", {}).get("min_text_length", 50)

# Filtres qualité : longueur minimale, déduplication exacte (par hash)
df_llm = llm_corpus.filter(F.col("text").isNotNull()) \
    .filter(F.length("text") >= min_len) \
    .withColumn("content_hash", F.xxhash64("text")) \
    .dropDuplicates(["content_hash"]) \
    .withColumn("source", F.lit("github_archive")) \
    .withColumn("version", F.lit("v1.0")) \
    .withColumn("curated_at", F.current_timestamp())

# Sauvegarde
df_llm.write.mode("overwrite").parquet(CFG["paths"]["llm_ready"])

# Mesure de qualité
total_initial = llm_corpus.count()
total_curated = df_llm.count()
pass_rate = (total_curated / total_initial * 100) if total_initial > 0 else 0

record_metric("LLM", "quality_pass_rate_pct", pass_rate, f"Pass rate, min_len={min_len}")
print(f"Documents initiaux : {total_initial}")
print(f"Documents validés  : {total_curated}")
print(f"Taux de passage    : {pass_rate:.1f}% (SLO >= 80%)")
[Stage 375:===================================================>   (15 + 1) / 16]
Documents initiaux : 302570
Documents validés  : 36814
Taux de passage    : 12.2% (SLO >= 80%)
                                                                                

Phase 6 : Optimisation Physique¶

Examen des plans EXPLAIN FORMATTED et sauvegarde des preuves. Compaction et réduction du shuffle.

In [20]:
# 6.1 Plans EXPLAIN et optimisation
import os
os.makedirs(CFG["paths"]["proof"], exist_ok=True)

# Accès à la machine virtuelle Java (JVM) sous-jacente de Spark
jvm = spark.sparkContext._jvm
explain_mode = jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")

# ---------------------------------------------------------
In [21]:
# 6.1 Plans EXPLAIN et optimisation
import os
os.makedirs(CFG["paths"]["proof"], exist_ok=True)

# Accès à la machine virtuelle Java (JVM) sous-jacente de Spark
jvm = spark.sparkContext._jvm
explain_mode = jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")

# Capturer le plan d'une requête analytique Gold
q_gold = spark.read.parquet(CFG["paths"]["gold"] + "/repo_activity") \
    .groupBy("repo_name").agg(F.sum("event_count").alias("total")) \
    .orderBy(F.desc("total"))

plan_str = q_gold._jdf.queryExecution().explainString(explain_mode)

with open(os.path.join(CFG["paths"]["proof"], "plan_gold_query.txt"), "w") as f:
    f.write(plan_str)

# Capturer le plan d'une itération PageRank
contribs = edges.join(ranks, "src").join(out_degree, "src") \
    .select(F.col("dst").alias("src"), (F.col("rank") / F.col("out_deg")).alias("contrib"))

plan_pr = contribs._jdf.queryExecution().explainString(explain_mode)

with open(os.path.join(CFG["paths"]["proof"], "plan_iterative_pagerank.txt"), "w") as f:
    f.write(plan_pr)

print("Plans d'exécution sauvegardés avec succès dans le dossier proof/.")
Plans d'exécution sauvegardés avec succès dans le dossier proof/.

Phase 7 : Preuves et Arrêt¶

Affichage des métriques collectées et arrêt propre du cluster Spark.

In [23]:
# 7.1 Vérification des SLOs et résumé
print("=" * 60)
print("RESUME DU PIPELINE DE2 — GitHub Archive (Track B)")
print("=" * 60)

import pandas as pd
df_metrics = pd.read_csv(CFG["paths"]["metrics_log"])
print(df_metrics.tail(15)[["stage", "metric_name", "metric_value"]].to_string())

spark.stop()
print("\nSession Spark arrêtée avec succès. N'oubliez pas de capturer vos captures d'écran Spark UI !")
============================================================
RESUME DU PIPELINE DE2 — GitHub Archive (Track B)
============================================================
         stage               metric_name  metric_value
123  Iterative            pagerank_delta      0.287960
124  Iterative            pagerank_delta           NaN
125  Iterative            pagerank_delta      0.686658
126  Iterative            pagerank_delta      0.142394
127  Iterative            pagerank_delta      0.000000
128  Iterative            pagerank_delta      0.000000
129  Iterative            pagerank_delta      0.000000
130  Iterative            pagerank_delta      0.000000
131  Iterative            pagerank_delta      0.000000
132  Iterative            pagerank_delta      0.000000
133  Iterative            pagerank_delta      0.000000
134  Iterative            pagerank_delta      0.000000
135  Iterative      pagerank_latency_sec     25.802758
136  Iterative  pagerank_opt_latency_sec      2.867728
137        LLM     quality_pass_rate_pct     12.167102

Session Spark arrêtée avec succès. N'oubliez pas de capturer vos captures d'écran Spark UI !