DE2 — Assignment 2 : Texte — Index Inversé¶
Auteur : Badr TAJINI - Data Engineering II (Workloads Intensifs en Données) - ESIEE 2025-2026
Piste : (Saisir votre piste : A/B/C/D)
Noms : (Étudiant 1 — Étudiant 2)
Complétez les cellules ci-dessous. Consultez DE2_Lab2_Overview_EN.md et helper_assignment2-de2_esiee.md pour plus de détails.
0. Configuration initiale et préparation¶
Cette première cellule initialise l'environnement Spark et crée les répertoires nécessaires pour stocker les résultats. Nous configurons la session Spark avec les paramètres réseau appropriés et affichons des informations sur l'environnement d'exécution.
# === Importations et Configuration ===
import os
import io
import sys
import shutil
from urllib.parse import urlparse
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType
import time
import pathlib
import json
import csv
DE2_SPARK_DRIVER_HOST = os.environ.get("DE2_SPARK_DRIVER_HOST", "127.0.0.1")
DE2_SPARK_BIND_ADDRESS = os.environ.get("DE2_SPARK_BIND_ADDRESS", "0.0.0.0")
os.environ.setdefault("SPARK_LOCAL_IP", DE2_SPARK_DRIVER_HOST)
# Fonction pour afficher l'interface Spark
def show_spark_ui(spark_session):
ui_url = spark_session.sparkContext.uiWebUrl
print("Version Spark :", spark_session.version)
if ui_url:
ui_port = urlparse(ui_url).port or 4040
print("Interface Spark :", ui_url)
print("Interface Spark (navigateur WSL/Windows) :", f"http://localhost:{ui_port}")
else:
print("Interface Spark : non disponible")
# Initialisation de la session Spark
spark = SparkSession.builder \
.appName("de2-assignment2") \
.master("local[*]") \
.config("spark.driver.host", DE2_SPARK_DRIVER_HOST) \
.config("spark.driver.bindAddress", DE2_SPARK_BIND_ADDRESS) \
.config("spark.ui.bindAddress", DE2_SPARK_BIND_ADDRESS) \
.getOrCreate()
show_spark_ui(spark)
# Création des répertoires de sortie
output_dir = pathlib.Path("outputs/lab2")
output_dir.mkdir(parents=True, exist_ok=True)
parquet_dir = output_dir / "inverted_index"
csv_dir = output_dir / "inverted_index_csv"
proof_dir = pathlib.Path("proof")
proof_dir.mkdir(parents=True, exist_ok=True)
# Nettoyer les répertoires s'ils existent
for d in [parquet_dir, csv_dir]:
if d.exists():
shutil.rmtree(d)
d.mkdir(parents=True, exist_ok=True)
print("Répertoires de sortie créés avec succès.")
Version Spark : 4.0.1 Interface Spark : http://127.0.0.1:4040 Interface Spark (navigateur WSL/Windows) : http://localhost:4040 Répertoires de sortie créés avec succès.
1. Ingestion du corpus de texte¶
Dans cette section, nous chargeons un corpus de texte avec un schéma explicite. Chaque document possède un identifiant unique (doc_id) et un contenu textuel. Nous affichons aussi le nombre total de documents et quelques exemples pour valider le chargement du corpus.
# === Définition du schéma et chargement du corpus ===
# Schéma : chaque document a un ID unique et un contenu textuel
schema = StructType([
StructField("doc_id", StringType(), False),
StructField("content", StringType(), False)
])
# Corpus exemple : articles courts sur Spark
corpus_data = [
("doc_1", "Apache Spark is a unified analytics engine for big data processing"),
("doc_2", "Spark provides high-level APIs in Java, Scala, Python, and R"),
("doc_3", "Spark runs on Hadoop, Mesos, Kubernetes, or standalone"),
("doc_4", "Structured Streaming enables fast fault-tolerant stream processing"),
("doc_5", "Spark SQL provides DataFrame and SQL query capabilities"),
("doc_6", "Machine learning library MLlib supports classification and clustering"),
("doc_7", "GraphX is Spark's API for graphs and graph-parallel computation"),
("doc_8", "Spark excels at iterative algorithms and interactive analysis"),
("doc_9", "RDD is the fundamental data structure of Apache Spark"),
("doc_10", "DataFrames are distributed collections of data organized into columns")
]
# Création du DataFrame corpus à partir des données
df_corpus = spark.createDataFrame(corpus_data, schema=schema)
print(f"Nombre total de documents : {df_corpus.count()}")
print("\nÉchantillon du corpus :")
df_corpus.show(truncate=False)
Nombre total de documents : 10 Échantillon du corpus : +------+---------------------------------------------------------------------+ |doc_id|content | +------+---------------------------------------------------------------------+ |doc_1 |Apache Spark is a unified analytics engine for big data processing | |doc_2 |Spark provides high-level APIs in Java, Scala, Python, and R | |doc_3 |Spark runs on Hadoop, Mesos, Kubernetes, or standalone | |doc_4 |Structured Streaming enables fast fault-tolerant stream processing | |doc_5 |Spark SQL provides DataFrame and SQL query capabilities | |doc_6 |Machine learning library MLlib supports classification and clustering| |doc_7 |GraphX is Spark's API for graphs and graph-parallel computation | |doc_8 |Spark excels at iterative algorithms and interactive analysis | |doc_9 |RDD is the fundamental data structure of Apache Spark | |doc_10|DataFrames are distributed collections of data organized into columns| +------+---------------------------------------------------------------------+
2. Normalisation du texte¶
Cette étape transforme le texte brut en tokens normalisés. Nous convertissons tout en minuscules, supprimons la ponctuation, tokenisons en mots individuels, et filtrons les mots vides (stop-words) courants qui n'apportent pas de sens sémantique. Nous affichons les comptages avant et après normalisation pour évaluer l'impact du filtrage.
# === Normalisation : minuscules, tokenisation, suppression des stop-words ===
# Liste des stop-words courants en anglais (mots peu significatifs)
stop_words = {
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
"is", "are", "was", "were", "be", "been", "being", "have", "has", "had",
"do", "does", "did", "will", "would", "could", "should", "may", "might",
"can", "of", "with", "by", "from", "as", "it", "this", "that", "which",
"who", "what", "where", "when", "why", "how"
}
# Étape 1 : convertir en minuscules et supprimer la ponctuation et caractères spéciaux
df_normalized = df_corpus.withColumn(
"content_clean",
F.lower(F.regexp_replace(F.col("content"), r"[^a-zA-Z0-9\s]", ""))
)
# Étape 2 : tokeniser en mots individuels en scindant par espaces
df_tokens = df_normalized.withColumn(
"tokens",
F.split(F.col("content_clean"), r"\s+")
).drop("content_clean")
# Comptage des tokens avant filtrage
total_tokens_before = df_tokens.select(
F.size(F.col("tokens")).alias("token_count")
).agg(F.sum("token_count")).collect()[0][0]
print(f"Nombre total de tokens AVANT filtrage : {total_tokens_before}")
# Étape 3 : exploser les tokens (une ligne par token) et filtrer les stop-words
df_exploded = df_tokens.withColumn(
"token",
F.explode(F.col("tokens"))
).drop("tokens", "content")
df_filtered = df_exploded.filter(
(F.col("token") != "") & (~F.col("token").isin(stop_words))
)
# Comptage des tokens après filtrage
total_tokens_after = df_filtered.count()
print(f"Nombre total de tokens APRÈS filtrage : {total_tokens_after}")
print(f"Tokens supprimés (stop-words) : {total_tokens_before - total_tokens_after}")
print("\nÉchantillon de tokens normalisés :")
df_filtered.show(10)
Nombre total de tokens AVANT filtrage : 87
Nombre total de tokens APRÈS filtrage : 68 Tokens supprimés (stop-words) : 19 Échantillon de tokens normalisés :
+------+----------+ |doc_id| token| +------+----------+ | doc_1| apache| | doc_1| spark| | doc_1| unified| | doc_1| analytics| | doc_1| engine| | doc_1| big| | doc_1| data| | doc_1|processing| | doc_2| spark| | doc_2| provides| +------+----------+ only showing top 10 rows
3. Construction de l'index inversé¶
L'index inversé est la structure clé pour les recherches textuelles rapides. Pour chaque token (terme) unique, nous regroupons tous les IDs de documents où ce terme apparaît et comptabilisons sa fréquence. Cela crée une structure permettant une recherche O(1) au lieu de scanner tous les documents.
# === Construction de l'index inversé ===
# Regrouper par token et collecter tous les IDs de documents où le token apparaît
df_inverted_index = df_filtered.groupBy("token").agg(
F.collect_list("doc_id").alias("doc_ids"), # Tous les doc_id contenant ce token
F.count("*").alias("freq") # Fréquence : nombre d'occurrences du token
).orderBy(F.desc("freq")) # Tri décroissant par fréquence
# Nombre de termes uniques
unique_terms = df_inverted_index.count()
print(f"Nombre de termes uniques dans l'index : {unique_terms}")
print("\nÉchantillon de l'index inversé (premiers 15 termes par fréquence) :")
df_inverted_index.show(15, truncate=False)
Nombre de termes uniques dans l'index : 57 Échantillon de l'index inversé (premiers 15 termes par fréquence) :
[Stage 20:> (0 + 8) / 8]
+----------+------------------------------------------+----+ |token |doc_ids |freq| +----------+------------------------------------------+----+ |spark |[doc_1, doc_2, doc_3, doc_5, doc_8, doc_9]|6 | |data |[doc_1, doc_9, doc_10] |3 | |apache |[doc_1, doc_9] |2 | |processing|[doc_1, doc_4] |2 | |provides |[doc_2, doc_5] |2 | |sql |[doc_5, doc_5] |2 | |analytics |[doc_1] |1 | |unified |[doc_1] |1 | |engine |[doc_1] |1 | |big |[doc_1] |1 | |apis |[doc_2] |1 | |scala |[doc_2] |1 | |java |[doc_2] |1 | |r |[doc_2] |1 | |highlevel |[doc_2] |1 | +----------+------------------------------------------+----+ only showing top 15 rows
4. Écriture en Parquet et CSV¶
Nous persistons l'index inversé dans deux formats pour comparer leur efficacité. Le format Parquet est columnar et optimisé pour les requêtes analytiques (avec compression), tandis que CSV est un format texte universel mais moins compressé et moins rapide pour les requêtes.
# === Écriture de l'index inversé en Parquet et CSV ===
print("Écriture de l'index inversé en Parquet...")
df_inverted_index.write.mode("overwrite").parquet(str(parquet_dir))
print(f"Index Parquet écrit dans : {parquet_dir}")
print("\nEcriture de l'index inversé en CSV...")
# Convertir l'array doc_ids en string (séparé par des virgules) pour CSV
df_inverted_index_csv = df_inverted_index.withColumn(
"doc_ids",
F.concat_ws(",", F.col("doc_ids")) # Joindre les elements du tableau avec des virgules
)
df_inverted_index_csv.coalesce(1).write \
.mode("overwrite") \
.option("header", "true") \
.csv(str(csv_dir))
print(f"Index CSV écrit dans : {csv_dir}")
# Rechargement des index pour les requêtes suivantes
df_index_parquet = spark.read.parquet(str(parquet_dir))
df_index_csv = spark.read.option("header", "true").csv(str(csv_dir))
print("\nIndex rechargés avec succès pour les requêtes.")
Écriture de l'index inversé en Parquet...
Index Parquet écrit dans : outputs/lab2/inverted_index Ecriture de l'index inversé en CSV...
Index CSV écrit dans : outputs/lab2/inverted_index_csv Index rechargés avec succès pour les requêtes.
5. Mesure de la latence des requêtes¶
Nous mesurons le temps d'exécution (wall-clock time) pour rechercher des termes spécifiques dans l'index. Ceci simule un cas d'utilisation réel où les utilisateurs interrogent l'index inversé. Nous enregistrerons les plans Spark pour comprendre les optimisations appliquées par le moteur de requête.
# === Requêtes sur l'index et mesure de latence ===
# Termes à rechercher
terms_to_query = ["spark", "data", "processing"]
query_latencies = []
print("Mesure de latence des requêtes sur l'index Parquet :")
print("-" * 70)
for term in terms_to_query:
# Mesure du temps de requête sur Parquet
start_time = time.time()
result = df_index_parquet.filter(F.col("token") == term).collect()
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
query_latencies.append({
"term": term,
"format": "parquet",
"latency_ms": latency_ms,
"found": len(result) > 0,
"freq": result[0]["freq"] if len(result) > 0 else 0,
"doc_count": len(result[0]["doc_ids"]) if len(result) > 0 else 0
})
print(f"Requête : '{term}'")
print(f" Latence Parquet : {latency_ms:.2f} ms")
if len(result) > 0:
# doc_ids est un array dans Parquet
doc_ids_list = result[0]["doc_ids"]
doc_ids_str = ", ".join(doc_ids_list)
print(f" Fréquence : {result[0]['freq']}, Documents : [{doc_ids_str}]")
else:
print(f" Terme non trouvé")
print()
print("Mesure de latence des requêtes sur l'index CSV :")
print("-" * 70)
for term in terms_to_query:
# Mesure du temps de requête sur CSV
start_time = time.time()
result = df_index_csv.filter(F.col("token") == term).collect()
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
print(f"Requête : '{term}'")
print(f" Latence CSV : {latency_ms:.2f} ms")
if len(result) > 0:
# doc_ids est une string dans CSV (séparée par des virgules)
doc_ids_str = result[0]["doc_ids"]
doc_ids_list = doc_ids_str.split(",") if doc_ids_str else []
print(f" Fréquence : {result[0]['freq']}, Document count : {len(doc_ids_list)}")
else:
print(f" Terme non trouvé")
print()
print("Latences enregistrées pour analyse.")
Mesure de latence des requêtes sur l'index Parquet : ---------------------------------------------------------------------- Requête : 'spark' Latence Parquet : 614.93 ms Fréquence : 6, Documents : [doc_1, doc_2, doc_3, doc_5, doc_8, doc_9] Requête : 'data' Latence Parquet : 179.39 ms Fréquence : 3, Documents : [doc_1, doc_9, doc_10] Requête : 'processing' Latence Parquet : 169.84 ms Fréquence : 2, Documents : [doc_1, doc_4] Mesure de latence des requêtes sur l'index CSV : ---------------------------------------------------------------------- Requête : 'spark' Latence CSV : 237.88 ms Fréquence : 6, Document count : 6 Requête : 'data' Latence CSV : 137.92 ms Fréquence : 3, Document count : 3 Requête : 'processing' Latence CSV : 140.65 ms Fréquence : 2, Document count : 2 Latences enregistrées pour analyse.
6. Comparaison de l'empreinte disque¶
Nous comparons la taille disque utilisée par les deux formats de stockage. Parquet utilise la compression columnar, tandis que CSV est un format texte universel mais moins compressé et moins rapide pour les requêtes. Cette comparaison montre les économies de stockage réalisées avec des formats spécialisés.
# === Comparaison des empreintes disque (Parquet vs CSV) ===
def get_directory_size(path):
"""Calcule la taille totale d'un répertoire en bytes"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if os.path.exists(filepath):
total_size += os.path.getsize(filepath)
return total_size
# Calcul des tailles
parquet_size = get_directory_size(str(parquet_dir))
csv_size = get_directory_size(str(csv_dir))
print("Comparaison des empreintes disque :")
print("-" * 70)
print(f"Taille Parquet : {parquet_size:,} bytes ({parquet_size / 1024 / 1024:.2f} MB)")
print(f"Taille CSV : {csv_size:,} bytes ({csv_size / 1024 / 1024:.2f} MB)")
if csv_size > 0:
compression_ratio = (csv_size - parquet_size) / csv_size * 100
print(f"Réduction de taille : {compression_ratio:.1f}% (Parquet vs CSV)")
else:
print("CSV vide, impossible de calculer le ratio.")
print("\nDétail des fichiers Parquet :")
for root, dirs, files in os.walk(str(parquet_dir)):
for file in files:
filepath = os.path.join(root, file)
size = os.path.getsize(filepath)
print(f" {file}: {size:,} bytes")
Comparaison des empreintes disque : ---------------------------------------------------------------------- Taille Parquet : 1,882 bytes (0.00 MB) Taille CSV : 1,068 bytes (0.00 MB) Réduction de taille : -76.2% (Parquet vs CSV) Détail des fichiers Parquet : ._SUCCESS.crc: 8 bytes part-00000-9e702105-9a24-4791-ae44-0e03c3b25da4-c000.snappy.parquet: 1,850 bytes .part-00000-9e702105-9a24-4791-ae44-0e03c3b25da4-c000.snappy.parquet.crc: 24 bytes _SUCCESS: 0 bytes
7. Preuves et métriques¶
Nous sauvegardons les plans d'exécution Spark (version textuelle et formatée) dans le répertoire proof/. Ceux-ci montrent comment Spark optimise les requêtes et où se situent les goulots d'étranglement. Nous créons également un journal de métriques CSV contenant les mesures clés : latences, tailles de stockage, et paramètres de requête.
# === Sauvegarde des plans d'exécution et création du journal de métriques ===
# Sauvegarde du plan pour la construction de l'index
print("Sauvegarde du plan d'exécution pour la construction de l'index...")
plan_file = proof_dir / "plan_index_build.txt"
with open(plan_file, "w") as f:
# Capture du plan texte formaté
old_stdout = sys.stdout
buffer = io.StringIO()
sys.stdout = buffer
df_inverted_index.explain("formatted")
sys.stdout = old_stdout
output = buffer.getvalue()
f.write(output)
print(f"Plan d'index écrit dans : {plan_file}")
# Sauvegarde du plan pour une requête
print("Sauvegarde du plan d'exécution pour une requête...")
query_plan_file = proof_dir / "plan_query.txt"
with open(query_plan_file, "w") as f:
old_stdout = sys.stdout
buffer = io.StringIO()
sys.stdout = buffer
df_index_parquet.filter(F.col("token") == "spark").explain("formatted")
sys.stdout = old_stdout
output = buffer.getvalue()
f.write(output)
print(f"Plan de requête écrit dans : {query_plan_file}")
# Création du journal de métriques CSV
print("\nCréation du journal de métriques...")
metrics_log_file = "lab2_metrics_log.csv"
with open(metrics_log_file, "w", newline="") as csvfile:
fieldnames = [
"timestamp",
"index_type",
"term",
"latency_ms",
"parquet_size_bytes",
"csv_size_bytes",
"unique_terms",
"doc_count",
"token_freq"
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
current_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
# Écriture des métriques de requête
for latency_entry in query_latencies:
writer.writerow({
"timestamp": current_timestamp,
"index_type": "Parquet",
"term": latency_entry["term"],
"latency_ms": f"{latency_entry['latency_ms']:.2f}",
"parquet_size_bytes": parquet_size,
"csv_size_bytes": csv_size,
"unique_terms": unique_terms,
"doc_count": latency_entry["doc_count"],
"token_freq": latency_entry["freq"]
})
print(f"Journal de métriques écrit dans : {metrics_log_file}")
print("\nPrêt pour capture des images de l'interface Spark UI (http://localhost:4040)")
Sauvegarde du plan d'exécution pour la construction de l'index... Plan d'index écrit dans : proof/plan_index_build.txt Sauvegarde du plan d'exécution pour une requête... Plan de requête écrit dans : proof/plan_query.txt Création du journal de métriques... Journal de métriques écrit dans : lab2_metrics_log.csv Prêt pour capture des images de l'interface Spark UI (http://localhost:4040)
8. Nettoyage¶
Cette section finalise l'exécution en arrêtant la session Spark et en affichant un résumé des fichiers générés.
# === Arrêt et nettoyage ===
spark.stop()
print("\nAssignment 2 terminé avec succès.")
print("Fichiers de sortie générés :")
print(f" - Index Parquet : {parquet_dir}")
print(f" - Index CSV : {csv_dir}")
print(f" - Plans : {proof_dir}")
print(f" - Métriques : {metrics_log_file}")
Assignment 2 terminé avec succès. Fichiers de sortie générés : - Index Parquet : outputs/lab2/inverted_index - Index CSV : outputs/lab2/inverted_index_csv - Plans : proof - Métriques : lab2_metrics_log.csv