DE2 — Lab 2 : Traitement de texte — Pipeline d'index inversé (15%)¶

Auteur : Badr TAJINI - Data Engineering II (Workloads Intensifs en Données) - ESIEE 2025-2026

Piste : (Saisir votre piste : A/B/C/D)

Objectif : Ingérer un corpus de texte, tokeniser et normaliser, construire un index inversé, mesurer la latence des requêtes, comparer le stockage en Parquet vs CSV.

In [4]:
# === Configuration initiale et importations ===
import os
import io
import sys
import csv
import shutil
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType
import time
import pathlib
import pdfplumber

# Configuration pour l'accès à l'interface Spark
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)

# Fonction pour afficher les informations d'accès à l'interface Spark
def show_spark_ui(spark_session):
    ui_url = spark_session.sparkContext.uiWebUrl
    print("Version Spark :", spark_session.version)
    if ui_url:
        ui_port = urlparse(ui_url).port or 4040
        print("Interface Spark :", ui_url)
        print("Interface Spark (navigateur WSL/Windows) :", f"http://localhost:{ui_port}")
    else:
        print("Interface Spark : non disponible")

# Initialisation de la session Spark
spark = SparkSession.builder \
    .appName("DE2-Lab2-TraitementTexte") \
    .master("local[*]") \
    .config("spark.driver.host", DE2_SPARK_DRIVER_HOST) \
    .config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS) \
    .config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS) \
    .getOrCreate()

show_spark_ui(spark)

# Création des répertoires de sortie
output_dir = pathlib.Path("outputs/lab2")
output_dir.mkdir(parents=True, exist_ok=True)
parquet_dir = output_dir / "inverted_index"
csv_dir = output_dir / "inverted_index_csv"
proof_dir = pathlib.Path("proof")
proof_dir.mkdir(parents=True, exist_ok=True)

# Nettoyer les répertoires s'ils existent
for d in [parquet_dir, csv_dir]:
    if d.exists():
        shutil.rmtree(d)
    d.mkdir(parents=True, exist_ok=True)

print("Répertoires de sortie créés avec succès.")
Version Spark : 4.0.0
Interface Spark : http://127.0.0.1:4040
Interface Spark (navigateur WSL/Windows) : http://localhost:4040
Répertoires de sortie créés avec succès.

1. Ingestion du corpus de texte¶

Cette section charge un corpus de texte avec un schéma explicite. Chaque document possède un identifiant unique (doc_id) et un contenu textuel. Nous affichons le nombre total de documents, des statistiques sur la longueur moyenne et quelques échantillons pour valider le chargement.

In [6]:
# === Définition du schéma et ingestion du corpus ===
# Schéma : chaque document a un ID unique et du contenu textuel
schema = StructType([
    StructField("doc_id", StringType(), False),
    StructField("text", StringType(), False)
])

# Fonction pour extraire le texte des PDFs
def extract_text_from_pdfs(pdf_folder):
    """Extrait le texte de tous les PDFs dans un dossier"""
    corpus_data = []
    pdf_folder_path = pathlib.Path(pdf_folder)
    
    if not pdf_folder_path.exists():
        print(f"⚠️  Dossier {pdf_folder} non trouvé.")
        return None
    
    pdf_files = sorted([f for f in pdf_folder_path.iterdir() if f.suffix.lower() == '.pdf'])
    
    if not pdf_files:
        print(f"⚠️  Aucun PDF trouvé dans {pdf_folder}.")
        return None
    
    print(f"📄 {len(pdf_files)} PDF(s) détecté(s). Extraction en cours...\n")
    for idx, pdf_path in enumerate(pdf_files, 1):
        try:
            with pdfplumber.open(pdf_path) as pdf:
                text = "\n".join([page.extract_text() or "" for page in pdf.pages])
                if text.strip():
                    doc_id = f"doc_{idx}"
                    corpus_data.append((doc_id, text.strip()))
                    print(f"✓ Chargé : {pdf_path.name} ({len(text)} caractères)")
                else:
                    print(f"⚠️  {pdf_path.name} est vide")
        except Exception as e:
            print(f"✗ Erreur lors de la lecture {pdf_path.name}: {e}")
    
    return corpus_data if corpus_data else None

# Chargement des PDFs
pdf_folder = "data/pdfs"
print("Extraction du texte depuis les PDFs...")
corpus_data = extract_text_from_pdfs(pdf_folder)

# Création du DataFrame corpus à partir des données
df_corpus = spark.createDataFrame(corpus_data, schema=schema)

# Affichage des statistiques du corpus
num_docs = df_corpus.count()
avg_length = df_corpus.select(F.avg(F.length(F.col("text")))).collect()[0][0]
print(f"\nNombre de documents : {num_docs}")
print(f"Longueur moyenne des documents : {avg_length:.0f} caractères")
print("\nÉchantillon du corpus (premiers 5 documents) :")
df_corpus.show(5, truncate=80)
Extraction du texte depuis les PDFs...
📄 3 PDF(s) détecté(s). Extraction en cours...

✓ Chargé : doc1.pdf (8660 caractères)
✓ Chargé : doc2.pdf (169810 caractères)
✓ Chargé : doc3.pdf (149951 caractères)
                                                                                
Nombre de documents : 3
Longueur moyenne des documents : 109473 caractères

Échantillon du corpus (premiers 5 documents) :
+------+--------------------------------------------------------------------------------+
|doc_id|                                                                            text|
+------+--------------------------------------------------------------------------------+
| doc_1|12 livres pour CHANGER DE VIE,\nles meilleurs livres de Développement personn...|
| doc_2|Documentation de l’AMGE sur le Leadership\nDéveloppement personnel\nDocumenta...|
| doc_3|CHAQUE JOURNÉE\nEST UN CHAPITRE\nDE TA VIE\nCHAQUE JOURNÉE\nEST UN CHAPITRE D...|
+------+--------------------------------------------------------------------------------+

2. Normalisation du texte (Français) — Minuscules, tokenisation, suppression des stop-words¶

Cette étape transforme le texte brut en tokens normalisés. Nous convertissons tout en minuscules, supprimons la ponctuation, tokenisons en mots individuels, et filtrons les mots vides (stop-words) courants en français. Nous affichons les comptages avant et après pour évaluer l'impact du filtrage.

In [8]:
# === Pipeline de normalisation du texte ===

# Définition des stop-words courants en français
STOP_WORDS = {
    "le", "la", "les", "de", "du", "des", "un", "une", "et", "ou", "mais",
    "est", "sont", "être", "avoir", "je", "tu", "il", "elle", "nous", "vous",
    "ils", "elles", "ce", "cette", "ces", "celui", "ceux", "on", "c", "d",
    "ma", "mon", "mes", "ta", "ton", "tes", "sa", "son", "ses", "notre",
    "votre", "leur", "qui", "que", "quoi", "dont", "où", "quand", "comment",
    "pourquoi", "quel", "quelle", "quels", "quelles", "aucun", "aucune", "nul",
    "pas", "ne", "oui", "non", "plus", "moins", "très", "trop", "assez",
    "beaucoup", "peu", "bien", "mal", "autre", "même", "tel", "telle", "soi",
    "lui", "moi", "toi", "eux", "à", "au", "avec", "sans", "pour", "par",
    "dans", "entre", "sous", "sur", "devant", "derrière", "près", "loin"
}

# Étape 1 : convertir en minuscules et supprimer la ponctuation
df_normalized = df_corpus.withColumn(
    "text_clean",
    F.lower(F.regexp_replace(F.col("text"), r"[^a-zàâäæçéèêëïîôöùûüœ0-9\s]", ""))
)

# Étape 2 : tokeniser en mots individuels en scindant par espaces
df_tokens = df_normalized.withColumn(
    "tokens",
    F.split(F.col("text_clean"), r"\s+")
).drop("text_clean")

# Calcul du nombre de tokens avant filtrage
total_tokens_before = df_tokens.select(
    F.size(F.col("tokens")).alias("token_count")
).agg(F.sum("token_count")).collect()[0][0]
print(f"Nombre total de tokens AVANT filtrage : {total_tokens_before}")

# Étape 3 : exploser les tokens (une ligne par token) et filtrer les stop-words
df_exploded = df_tokens.withColumn(
    "token",
    F.explode(F.col("tokens"))
).drop("tokens", "text")

# Filtrer les stop-words et les tokens vides
df_filtered = df_exploded.filter(
    (F.col("token") != "") & (~F.col("token").isin(STOP_WORDS))
)

# Calcul du nombre de tokens après filtrage
total_tokens_after = df_filtered.count()
tokens_removed = total_tokens_before - total_tokens_after
print(f"Nombre total de tokens APRÈS filtrage : {total_tokens_after}")
print(f"Tokens supprimés (stop-words) : {tokens_removed}")
print(f"Réduction : {(tokens_removed / total_tokens_before * 100):.1f}%")
print("\nÉchantillon de tokens normalisés :")
df_filtered.show(10)
Nombre total de tokens AVANT filtrage : 49183
Nombre total de tokens APRÈS filtrage : 30656
Tokens supprimés (stop-words) : 18527
Réduction : 37.7%

Échantillon de tokens normalisés :
+------+------------+
|doc_id|       token|
+------+------------+
| doc_1|          12|
| doc_1|      livres|
| doc_1|   meilleurs|
| doc_1|      livres|
| doc_1|éveloppement|
| doc_1|   personnel|
| doc_1|           l|
| doc_1|           y|
| doc_1|           a|
| doc_1|      livres|
+------+------------+
only showing top 10 rows

3. Construction de l'index inversé¶

L'index inversé est la structure clé pour les recherches textuelles efficaces. Pour chaque token (terme) unique, nous regroupons tous les IDs de documents où ce terme apparaît et comptabilisons sa fréquence. Cette structure permet une recherche O(1) au lieu de scanner tous les documents.

In [9]:
# === Construction de l'index inversé ===
# Regrouper par token et collecter tous les IDs de documents où le token apparaît
inverted_index = (df_filtered
    .groupBy("token")
    .agg(
        F.collect_list("doc_id").alias("doc_ids"),  # Liste des doc_id contenant ce token
        F.count("*").alias("freq")  # Fréquence : nombre d'occurrences du token
    )
    .orderBy(F.desc("freq")))  # Tri par fréquence décroissante

# Statistiques de l'index
unique_terms = inverted_index.count()
print(f"Nombre de termes uniques dans l'index : {unique_terms}")
print("\nÉchantillon de l'index inversé (termes les plus fréquents) :")
inverted_index.show(15, truncate=60)
                                                                                
Nombre de termes uniques dans l'index : 7474

Échantillon de l'index inversé (termes les plus fréquents) :
[Stage 40:===========================================>              (6 + 2) / 8]
+---------+------------------------------------------------------------+----+
|    token|                                                     doc_ids|freq|
+---------+------------------------------------------------------------+----+
|       en|[doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 545|
|      vie|[doc_1, doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, ...| 258|
|        a|[doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, ...| 254|
|        e|[doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, ...| 235|
|        l|[doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 221|
|       es|[doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, doc_1, doc_2, ...| 213|
|       te|[doc_2, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, ...| 196|
|       se|[doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 179|
|       me|[doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 154|
|personnel|[doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 141|
|     tout|[doc_1, doc_1, doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, ...| 138|
|       si|[doc_1, doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, ...| 136|
|  routine|[doc_2, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, ...| 131|
|    faire|[doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 122|
|        1|[doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, doc_2, ...| 121|
+---------+------------------------------------------------------------+----+
only showing top 15 rows
                                                                                

4. Écriture de l'index inversé — Parquet et CSV¶

Nous persistons l'index inversé dans deux formats pour comparer leur efficacité. Le format Parquet est columnar et optimisé pour les requêtes analytiques (avec compression), tandis que CSV est un format texte universel mais moins compressé. Cela permet d'évaluer le compromis entre performance et universalité des formats.

In [10]:
# === Écriture de l'index inversé en Parquet et CSV ===
print("Écriture de l'index inversé en Parquet...")
inverted_index.write.mode("overwrite").parquet(str(parquet_dir))
print(f"Index Parquet écrit dans : {parquet_dir}")

print("\nEcriture de l'index inversé en CSV...")
# Convertir l'array doc_ids en string pour la compatibilité CSV
inverted_index_csv = inverted_index.withColumn(
    "doc_ids",
    F.concat_ws(",", F.col("doc_ids"))  # Joindre les doc_id avec des virgules
)

inverted_index_csv.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(str(csv_dir))
print(f"Index CSV écrit dans : {csv_dir}")

# Rechargement des index pour les étapes suivantes
df_index_parquet = spark.read.parquet(str(parquet_dir))
df_index_csv = spark.read.option("header", "true").csv(str(csv_dir))

print("\nIndex rechargés avec succès.")
Écriture de l'index inversé en Parquet...
                                                                                
Index Parquet écrit dans : outputs/lab2/inverted_index

Ecriture de l'index inversé en CSV...
                                                                                
Index CSV écrit dans : outputs/lab2/inverted_index_csv

Index rechargés avec succès.

5. Mesure de la latence des requêtes¶

Nous interrogeons l'index inversé pour rechercher des termes spécifiques et mesurons le temps d'exécution réel (wall-clock time) pour chaque requête. Cela simule un scénario utilisateur réel et nous aide à comprendre les performances pratiques de notre pipeline.

In [12]:
# === Requêtes sur l'index et mesure de latence ===
# Cache l'index pour une mesure plus juste
df_index_parquet.cache()
df_index_parquet.count()
print("Index Parquet mis en cache pour les mesures de latence.\n")

# Termes à rechercher (thème : développement personnel)
query_terms = [
    "développement", "personnel", "croissance", "apprentissage", "transformation",
    "potentiel", "confiance", "objectif", "succès", "motivation",
    "habitude", "changement", "progrès", "amélioration", "réussite",
    "compétence", "effort", "discipline"
]
query_latencies = []

print("Mesure de latence des requêtes sur l'index Parquet :")
print("-" * 80)

for term in query_terms:
    # Mesure du temps réel (wall-clock time)
    start_time = time.time()
    result = df_index_parquet.filter(F.col("token") == term).collect()
    end_time = time.time()
    latency_ms = (end_time - start_time) * 1000
    
    if result:
        doc_ids_list = result[0]["doc_ids"]
        freq = result[0]["freq"]
        postings_count = len(doc_ids_list)
        doc_ids_str = ", ".join(doc_ids_list)
        print(f"Requête : '{term}'")
        print(f"  Latence Parquet : {latency_ms:.2f} ms")
        print(f"  Fréquence (occurrences) : {freq}")
        print(f"  Nombre de documents : {postings_count}")
        print(f"  Documents : [{doc_ids_str}]")
        
        query_latencies.append({
            "term": term,
            "format": "parquet",
            "latency_ms": latency_ms,
            "found": True,
            "freq": freq,
            "doc_count": postings_count
        })
    else:
        print(f"Requête : '{term}'")
        print(f"  Latence Parquet : {latency_ms:.2f} ms")
        print(f"  Terme non trouvé")
        
        query_latencies.append({
            "term": term,
            "format": "parquet",
            "latency_ms": latency_ms,
            "found": False,
            "freq": 0,
            "doc_count": 0
        })
    print()

print("Mesure de latence des requêtes sur l'index CSV :")
print("-" * 80)

for term in query_terms:
    start_time = time.time()
    result = df_index_csv.filter(F.col("token") == term).collect()
    end_time = time.time()
    latency_ms = (end_time - start_time) * 1000
    
    print(f"Requête : '{term}'")
    print(f"  Latence CSV : {latency_ms:.2f} ms")
    if result:
        doc_ids_str_raw = result[0]["doc_ids"]
        doc_ids_list = doc_ids_str_raw.split(",") if doc_ids_str_raw else []
        print(f"  Nombre de documents : {len(doc_ids_list)}")
    else:
        print(f"  Terme non trouvé")
    print()

# Sauvegarde du plan d'exécution pour une requête
print("Sauvegarde du plan d'exécution pour une requête...")
query_plan_file = proof_dir / "plan_query.txt"
with open(query_plan_file, "w") as f:
    old_stdout = sys.stdout
    buffer = io.StringIO()
    sys.stdout = buffer
    df_index_parquet.filter(F.col("token") == "spark").explain("formatted")
    sys.stdout = old_stdout
    output = buffer.getvalue()
    f.write(output)
print(f"Plan de requête écrit dans : {query_plan_file}")
26/04/29 19:50:51 WARN CacheManager: Asked to cache already cached data.
Index Parquet mis en cache pour les mesures de latence.

Mesure de latence des requêtes sur l'index Parquet :
--------------------------------------------------------------------------------
Requête : 'développement'
  Latence Parquet : 142.06 ms
  Fréquence (occurrences) : 68
  Nombre de documents : 68
  Documents : [doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3]

Requête : 'personnel'
  Latence Parquet : 169.23 ms
  Fréquence (occurrences) : 141
  Nombre de documents : 141
  Documents : [doc_1, doc_1, doc_1, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3]

Requête : 'croissance'
  Latence Parquet : 112.55 ms
  Fréquence (occurrences) : 7
  Nombre de documents : 7
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3]

Requête : 'apprentissage'
  Latence Parquet : 76.01 ms
  Fréquence (occurrences) : 5
  Nombre de documents : 5
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_3]

Requête : 'transformation'
  Latence Parquet : 71.22 ms
  Fréquence (occurrences) : 1
  Nombre de documents : 1
  Documents : [doc_1]

Requête : 'potentiel'
  Latence Parquet : 76.85 ms
  Fréquence (occurrences) : 36
  Nombre de documents : 36
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3, doc_3]

Requête : 'confiance'
  Latence Parquet : 164.29 ms
  Fréquence (occurrences) : 27
  Nombre de documents : 27
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2]

Requête : 'objectif'
  Latence Parquet : 131.90 ms
  Fréquence (occurrences) : 33
  Nombre de documents : 33
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3]

Requête : 'succès'
  Latence Parquet : 130.16 ms
  Fréquence (occurrences) : 7
  Nombre de documents : 7
  Documents : [doc_1, doc_1, doc_2, doc_2, doc_2, doc_3, doc_3]

Requête : 'motivation'
  Latence Parquet : 133.35 ms
  Fréquence (occurrences) : 12
  Nombre de documents : 12
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3]

Requête : 'habitude'
  Latence Parquet : 94.24 ms
  Fréquence (occurrences) : 12
  Nombre de documents : 12
  Documents : [doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3]

Requête : 'changement'
  Latence Parquet : 74.28 ms
  Fréquence (occurrences) : 13
  Nombre de documents : 13
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3]

Requête : 'progrès'
  Latence Parquet : 73.62 ms
  Fréquence (occurrences) : 6
  Nombre de documents : 6
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_3, doc_3]

Requête : 'amélioration'
  Latence Parquet : 74.90 ms
  Terme non trouvé

Requête : 'réussite'
  Latence Parquet : 94.23 ms
  Fréquence (occurrences) : 8
  Nombre de documents : 8
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3, doc_3]

Requête : 'compétence'
  Latence Parquet : 131.43 ms
  Fréquence (occurrences) : 38
  Nombre de documents : 38
  Documents : [doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_2, doc_3]

Requête : 'effort'
  Latence Parquet : 104.93 ms
  Fréquence (occurrences) : 4
  Nombre de documents : 4
  Documents : [doc_2, doc_2, doc_3, doc_3]

Requête : 'discipline'
  Latence Parquet : 112.60 ms
  Fréquence (occurrences) : 9
  Nombre de documents : 9
  Documents : [doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3, doc_3]

Mesure de latence des requêtes sur l'index CSV :
--------------------------------------------------------------------------------
Requête : 'développement'
  Latence CSV : 105.89 ms
  Nombre de documents : 68

Requête : 'personnel'
  Latence CSV : 101.37 ms
  Nombre de documents : 141

Requête : 'croissance'
  Latence CSV : 111.50 ms
  Nombre de documents : 7

Requête : 'apprentissage'
  Latence CSV : 118.36 ms
  Nombre de documents : 5

Requête : 'transformation'
  Latence CSV : 191.09 ms
  Nombre de documents : 1

Requête : 'potentiel'
  Latence CSV : 146.86 ms
  Nombre de documents : 36

Requête : 'confiance'
  Latence CSV : 99.96 ms
  Nombre de documents : 27

Requête : 'objectif'
  Latence CSV : 93.83 ms
  Nombre de documents : 33

Requête : 'succès'
  Latence CSV : 106.21 ms
  Nombre de documents : 7

Requête : 'motivation'
  Latence CSV : 96.21 ms
  Nombre de documents : 12

Requête : 'habitude'
  Latence CSV : 89.69 ms
  Nombre de documents : 12

Requête : 'changement'
  Latence CSV : 111.37 ms
  Nombre de documents : 13

Requête : 'progrès'
  Latence CSV : 130.35 ms
  Nombre de documents : 6

Requête : 'amélioration'
  Latence CSV : 130.28 ms
  Terme non trouvé

Requête : 'réussite'
  Latence CSV : 100.81 ms
  Nombre de documents : 8

Requête : 'compétence'
  Latence CSV : 106.79 ms
  Nombre de documents : 38

Requête : 'effort'
  Latence CSV : 88.39 ms
  Nombre de documents : 4

Requête : 'discipline'
  Latence CSV : 79.07 ms
  Nombre de documents : 9

Sauvegarde du plan d'exécution pour une requête...
Plan de requête écrit dans : proof/plan_query.txt

6. Comparaison de l'empreinte disque — Parquet vs CSV¶

Nous comparons la taille disque utilisée par les deux formats de stockage. Parquet utilise la compression columnar, tandis que CSV est un format texte universel mais moins optimisé. Cette comparaison quantifie les économies de stockage réalisées avec des formats spécialisés pour les requêtes analytiques.

In [13]:
# === Comparaison des empreintes disque (Parquet vs CSV) ===
def get_directory_size(path):
    """Calcule la taille totale d'un répertoire en bytes"""
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if os.path.exists(filepath):
                total_size += os.path.getsize(filepath)
    return total_size

# Calcul des tailles
parquet_size = get_directory_size(str(parquet_dir))
csv_size = get_directory_size(str(csv_dir))

print("Comparaison des empreintes disque :")
print("-" * 80)
print(f"Taille Parquet : {parquet_size:,} bytes ({parquet_size / 1024 / 1024:.4f} MB)")
print(f"Taille CSV     : {csv_size:,} bytes ({csv_size / 1024 / 1024:.4f} MB)")

if csv_size > 0:
    compression_ratio = (csv_size - parquet_size) / csv_size * 100
    savings_bytes = csv_size - parquet_size
    print(f"Économies de taille : {savings_bytes:,} bytes ({compression_ratio:.1f}%)")
else:
    print("CSV vide, impossible de calculer le ratio.")

print("\nDétail des fichiers Parquet :")
for root, dirs, files in os.walk(str(parquet_dir)):
    for file in files:
        filepath = os.path.join(root, file)
        size = os.path.getsize(filepath)
        print(f"  {file}: {size:,} bytes")

print("\nDétail des fichiers CSV :")
for root, dirs, files in os.walk(str(csv_dir)):
    for file in files:
        filepath = os.path.join(root, file)
        size = os.path.getsize(filepath)
        print(f"  {file}: {size:,} bytes")

# === Sauvegarde du plan d'exécution pour la construction de l'index ===
print("\nSauvegarde du plan d'exécution pour la construction de l'index...")
plan_build_file = proof_dir / "plan_index_build.txt"
with open(plan_build_file, "w") as f:
    old_stdout = sys.stdout
    buffer = io.StringIO()
    sys.stdout = buffer
    inverted_index.explain("formatted")
    sys.stdout = old_stdout
    output = buffer.getvalue()
    f.write(output)
print(f"Plan d'index écrit dans : {plan_build_file}")

# === Création du journal de métriques CSV ===
print("\nCréation du journal de métriques...")
metrics_log_file = "lab2_metrics_log.csv"

with open(metrics_log_file, "w", newline="") as csvfile:
    fieldnames = [
        "timestamp",
        "format",
        "term",
        "latency_ms",
        "parquet_size_bytes",
        "csv_size_bytes",
        "unique_terms",
        "total_docs",
        "doc_count",
        "term_freq"
    ]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    
    current_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    
    # Écriture des métriques de requête
    for latency_entry in query_latencies:
        writer.writerow({
            "timestamp": current_timestamp,
            "format": "Parquet",
            "term": latency_entry["term"],
            "latency_ms": f"{latency_entry['latency_ms']:.2f}",
            "parquet_size_bytes": parquet_size,
            "csv_size_bytes": csv_size,
            "unique_terms": unique_terms,
            "total_docs": num_docs,
            "doc_count": latency_entry["doc_count"] if latency_entry["found"] else 0,
            "term_freq": latency_entry["freq"] if latency_entry["found"] else 0
        })

print(f"Journal de métriques écrit : {metrics_log_file}")
print("\nPrêt pour capture des images d'interface Spark UI (http://localhost:4040)")
print("Consultez les onglets Jobs et SQL pour visualiser la construction et les requêtes")
Comparaison des empreintes disque :
--------------------------------------------------------------------------------
Taille Parquet : 60,909 bytes (0.0581 MB)
Taille CSV     : 275,577 bytes (0.2628 MB)
Économies de taille : 214,668 bytes (77.9%)

Détail des fichiers Parquet :
  ._SUCCESS.crc: 8 bytes
  part-00000-bd3b3501-e735-4d28-aa1f-2e4cc3cce23b-c000.snappy.parquet: 60,417 bytes
  .part-00000-bd3b3501-e735-4d28-aa1f-2e4cc3cce23b-c000.snappy.parquet.crc: 484 bytes
  _SUCCESS: 0 bytes

Détail des fichiers CSV :
  ._SUCCESS.crc: 8 bytes
  part-00000-d84a8001-19d8-4c8d-a4fc-1776b674903e-c000.csv: 273,421 bytes
  .part-00000-d84a8001-19d8-4c8d-a4fc-1776b674903e-c000.csv.crc: 2,148 bytes
  _SUCCESS: 0 bytes

Sauvegarde du plan d'exécution pour la construction de l'index...
Plan d'index écrit dans : proof/plan_index_build.txt

Création du journal de métriques...
Journal de métriques écrit : lab2_metrics_log.csv

Prêt pour capture des images d'interface Spark UI (http://localhost:4040)
Consultez les onglets Jobs et SQL pour visualiser la construction et les requêtes
In [14]:
# === Arrêt et nettoyage ===
spark.stop()
print("\nLab 2 - Pipeline de traitement de texte terminé avec succès.")
print("\nFichiers de sortie générés :")
print(f"  - Index Parquet : {parquet_dir}")
print(f"  - Index CSV : {csv_dir}")
print(f"  - Plans d'exécution : {proof_dir}")
print(f"  - Journal de métriques : {metrics_log_file}")
print("\nProchaines étapes :")
print("  1. Capturer les images d'interface Spark UI (tabs Jobs et SQL)")
print("  2. Rédiger une note d'ingénierie (1 page) analysant les résultats")
print("  3. Télécharger le notebook, outputs/, proof/, et la note sur GitHub")
Lab 2 - Pipeline de traitement de texte terminé avec succès.

Fichiers de sortie générés :
  - Index Parquet : outputs/lab2/inverted_index
  - Index CSV : outputs/lab2/inverted_index_csv
  - Plans d'exécution : proof
  - Journal de métriques : lab2_metrics_log.csv

Prochaines étapes :
  1. Capturer les images d'interface Spark UI (tabs Jobs et SQL)
  2. Rédiger une note d'ingénierie (1 page) analysant les résultats
  3. Télécharger le notebook, outputs/, proof/, et la note sur GitHub