Lab 1 : Vue d'ensemble du pipeline Structured Streaming¶
Ce lab construit un pipeline Spark Structured Streaming complet simulant un scénario d'ingestion de données Esports :
- Source : Fichiers JSON deposés dans un répertoire (simulant un flux d'événements en temps réel)
- Traitement : Fenêtres tumbling d'1 heure avec limite d'arrivée tardive (tolerance 15 min)
- Sink : Fichiers Parquet avec sémantique exactly-once via coordination de checkpoint
- Mesure : Capture détaillée des métriques et comparaison d'optimisation
Le pipeline traite les événements de fin de match d'esports, agrégant les kills/morts par équipe par heure.
DE2 — Lab 1 : Pipeline Structured Streaming (10%)¶
Auteur : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026
Piste : (Indiquez votre piste : A/B/C/D)
Objectif : Construire un pipeline Structured Streaming avec agrégation fenêtrée, watermarks et sink Parquet. Surveiller via query.lastProgress et l'interface Streaming. Livrer un rapport d'optimisation avant/après.
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
import time, pathlib, json
spark = SparkSession.builder \
.appName("DE2-Lab1-Streaming") \
.master("local[*]") \
.getOrCreate()
print("Version Spark :", spark.version)
print("Interface Spark :", spark.sparkContext.uiWebUrl)
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/04/29 11:30:44 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 10.192.33.105 instead (on interface wlp2s0) 26/04/29 11:30:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/04/29 11:30:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Version Spark : 4.0.1 Interface Spark : http://10.192.33.105:4040
1. Définir le schéma et préparer le répertoire de réception¶
Un schéma est un StructType qui déclare la structure attendue de chaque enregistrement JSON avant la lecture. Ceci garantit la sécurité des types et la détection précoce des erreurs.
Pour la piste Esports, nous définissons :
match_end_time: Horodatage d'événement (TimestampType) - utilisé pour le windowingteam_id: Identifiant de chaîne pour l'équipe (dimension de regroupement)kills,deaths: Métriques entières (dimensions d'agrégation)match_duration_sec: Durée du match en secondes (métrique supplémentaire)
Après avoir défini le schéma, nous créons un répertoire de réception où les fichiers JSON seront déposés pour simuler un flux.
import pathlib
import os
# Définir le schéma explicite pour la piste Esports : événements de fin de match
# Colonnes : match_end_time (horodatage événement), team_id (clé de regroupement),
# kills et deaths (métriques numériques pour agrégation)
schema = StructType([
StructField("match_end_time", TimestampType(), True),
StructField("team_id", StringType(), True),
StructField("kills", IntegerType(), True),
StructField("deaths", IntegerType(), True),
StructField("match_duration_sec", IntegerType(), True),
])
# Créer le répertoire de réception pour la source de streaming
# Les fichiers déposés ici seront automatiquement découverts par readStream
landing_dir = "data/stream_input/"
pathlib.Path(landing_dir).mkdir(parents=True, exist_ok=True)
print(f"Répertoire de réception créé : {landing_dir}")
# Créer les répertoires de sortie pour le sink et le checkpoint
output_base = "outputs/lab1/"
pathlib.Path(f"{output_base}stream_sink").mkdir(parents=True, exist_ok=True)
pathlib.Path(f"{output_base}checkpoint").mkdir(parents=True, exist_ok=True)
pathlib.Path("proof").mkdir(parents=True, exist_ok=True)
print(f"Répertoires de sortie créés sous {output_base}")
Répertoire de réception créé : data/stream_input/ Répertoires de sortie créés sous outputs/lab1/
2. Créer la source de streaming¶
L'API readStream de Spark ouvre une source fichier basée sur répertoire qui découvre continuellement de nouveaux fichiers :
spark.readStream.schema(...): Force notre schéma sur les enregistrements d'entrée.json(landing_dir): Lit le format JSON (alternative : .csv, .parquet)maxFilesPerTrigger=1: Traite un fichier par micro-batch (ordre déterministe)
Quand la requête s'exécute, elle interroge le répertoire de réception à chaque intervalle de déclenchement et traite les nouveaux fichiers automatiquement. La propriété isStreaming confirme que le DataFrame est en mode streaming (évaluation lazy).
# Créer la source de streaming à partir de fichiers JSON dans le répertoire de réception
# maxFilesPerTrigger=1 garantit un fichier par micro-batch pour un comportement prévisible
df_stream = (spark.readStream
.schema(schema)
.option("maxFilesPerTrigger", 1)
.json(landing_dir))
# Vérifier que le streaming est activé et inspecter le schéma
print("Est en streaming :", df_stream.isStreaming)
print("\nSchéma du DataFrame de streaming :")
df_stream.printSchema()
# Afficher les métadonnées des premières colonnes
print("\nSource de streaming prête à :", landing_dir)
Est en streaming : True Schéma du DataFrame de streaming : root |-- match_end_time: timestamp (nullable = true) |-- team_id: string (nullable = true) |-- kills: integer (nullable = true) |-- deaths: integer (nullable = true) |-- match_duration_sec: integer (nullable = true) Source de streaming prête à : data/stream_input/
3. Watermark + Agrégation fenêtrée¶
Le windowing et les watermarks sont critiques pour traiter les flux de données horodatés et désordonnés :
Watermark : Marque le seuil au-delà duquel les événements sont considérés comme trop tardifs et ignorés de l'état. Un watermark de 15 minutes signifie :
- Un événement arrivant 16 minutes après son horodatage sera ignoré
- L'état pour les fenêtres complétées est nettoyé automatiquement après le passage du watermark
Fenêtre tumbling : Fenêtres non-chevauchantes d'1 heure (ex : 00:00-01:00, 01:00-02:00, ...)
- Les événements sont assignés à exactement une fenêtre selon leur horodatage
- Chaque fenêtre produit une sortie après le passage de son watermark
Agrégation : On regroupe par fenêtre + team_id, puis on calcule :
count(*): Nombre de matchs dans la fenêtre de l'équipesum(kills),sum(deaths): Total des kills/deathsavg(kills),avg(match_duration_sec): Métriques moyennes par match
# Appliquer le watermark pour gérer les données tardives : les événements arrivant > 15 minutes après leur horodatage
# seront ignorés (peut être ajusté pour une politique d'arrivées tardives plus stricte ou plus lenient)
# Puis appliquer la fenêtre tumbling d'1 heure sur le temps d'événement
# Regrouper par la fenêtre et team_id pour calculer les métriques par équipe par heure
windowed = (df_stream
.withWatermark("match_end_time", "15 minutes")
.groupBy(
F.window("match_end_time", "1 hour"),
F.col("team_id")
)
.agg(
F.count("*").alias("num_matches"),
F.sum("kills").alias("total_kills"),
F.sum("deaths").alias("total_deaths"),
F.avg("kills").alias("avg_kills_per_match"),
F.avg("match_duration_sec").alias("avg_match_duration_sec")
)
.select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"team_id",
"num_matches",
"total_kills",
"total_deaths",
"avg_kills_per_match",
"avg_match_duration_sec"
))
print("Schéma d'agrégation fenêtrée :")
windowed.printSchema()
Schéma d'agrégation fenêtrée : root |-- window_start: timestamp (nullable = true) |-- window_end: timestamp (nullable = true) |-- team_id: string (nullable = true) |-- num_matches: long (nullable = false) |-- total_kills: long (nullable = true) |-- total_deaths: long (nullable = true) |-- avg_kills_per_match: double (nullable = true) |-- avg_match_duration_sec: double (nullable = true)
4. Écrire vers le sink Parquet¶
Le sink est où les résultats du streaming sont persistés. Notre choix :
Format : Parquet (columaire, compressé, efficace pour les requêtes OLAP)
Mode de sortie : "append" (seuls les résultats nouveaux/mis à jour sont écrits)
- Alternative : "update" (toutes les lignes du résultat réécrites) ou "complete" (ensemble complet du résultat)
Sémantique Exactly-Once : Réalisée via checkpointLocation
- Spark écrit un fichier d'état pour chaque micro-batch
- En cas d'échec/redémarrage, Spark saute les fichiers déjà traités (idempotent)
- Prévient la duplication même en cas de crash système
Déclencheur : processingTime="10 seconds" = tenter un micro-batch toutes les 10s
- Si l'entrée est plus lente, le micro-batch sera plus petit
- Si plus rapide, Spark regroupe plusieurs fichiers en un batch (jusqu'à l'heure du déclencheur)
# Écrire les résultats fenêtrés au format Parquet en mode append
# - format("parquet") : Stockage columaire Parquet pour requêtes efficaces
# - outputMode("append") : Seuls les nouveaux résultats sont écrits (garantit exactly-once avec checkpoint)
# - checkpointLocation : Stocke les métadonnées pour récupération d'échecs et prévention de doublons
# - trigger(processingTime="10 seconds") : Traiter un micro-batch toutes les 10 secondes
sink_path = "outputs/lab1/stream_sink"
checkpoint_path = "outputs/lab1/checkpoint"
query = (windowed.writeStream
.format("parquet")
.outputMode("append")
.option("path", sink_path)
.option("checkpointLocation", checkpoint_path)
.trigger(processingTime="10 seconds")
.start())
print(f"Requête de streaming démarrée :")
print(f" ID de requête : {query.id}")
print(f" Nom de requête : {query.name}")
print(f" Chemin du sink : {sink_path}")
print(f" Chemin du checkpoint : {checkpoint_path}")
# Laisser le flux s'exécuter pendant 60 secondes pour accumuler certains micro-batches
# (Les données doivent être présentes dans landing_dir pour que le traitement se produise)
query.awaitTermination(timeout=60)
query.stop()
print("Requête de streaming arrêtée après 60 secondes.")
26/04/29 11:30:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Requête de streaming démarrée : ID de requête : 68628729-0ab5-4dda-a826-294b8c1bf980 Nom de requête : None Chemin du sink : outputs/lab1/stream_sink Chemin du checkpoint : outputs/lab1/checkpoint Requête de streaming arrêtée après 60 secondes.
26/04/29 11:31:57 WARN DAGScheduler: Failed to cancel job group 2449abc1-b30a-48bf-a874-5c2334dc529e. Cannot find active jobs for it. 26/04/29 11:31:57 WARN DAGScheduler: Failed to cancel job group 2449abc1-b30a-48bf-a874-5c2334dc529e. Cannot find active jobs for it.
5. Surveiller la progression de la requête et l'interface Streaming¶
Après qu'une requête de streaming se termine (via awaitTermination), on capture la télémétrie de performance :
query.lastProgress : Un objet JSON contenant :
inputRowsPerSecond: Taux d'arrivée des lignes d'entréeprocessedRowsPerSecond: Taux de production des lignes de sortiebatchDuration: Millisecondes passées dans le dernier micro-batchnumOutputRows: Lignes écrites dans le dernier micro-batchtotalDelay: Latence bout-à-bout de l'ingestion à la sortie
Interface Streaming (http://localhost:4040/StreamingQuery/) :
- Affiche les statistiques agrégées sur tous les micro-batches
- Trace les taux d'entrée, traitement, durée des batches au fil du temps
- Aide à identifier les goulots (gestion d'état, E/S, etc.)
On sauvegarde :
- query_progress_before.json : Télémétrie brute de la première exécution
- plan_streaming_before.txt : Plan logique + physique pour analyse de requête
# Capturer les métriques de streaming de query.lastProgress après exécution
# Ceci contient la télémétrie de performance détaillée pour le dernier micro-batch
import io
import sys
progress = query.lastProgress
print("Progression de la requête (dernier micro-batch) :")
print(json.dumps(progress, indent=2, default=str))
# Extraire et enregistrer les métriques clés pour comparaison d'optimisation
metrics_before = {
"run_id": "avant_optimisation",
"trigger_interval_sec": 10,
"watermark_duration_min": 15,
"num_output_rows": progress.get("numOutputRows", 0),
"input_rows_per_second": progress.get("inputRowsPerSecond", 0),
"processed_rows_per_second": progress.get("processedRowsPerSecond", 0),
"batch_duration_ms": progress.get("batchDuration", 0),
"total_delay_ms": progress.get("totalDelay", 0),
"timestamp": progress.get("timestamp")
}
print("\nMétriques clés (Avant optimisation) :")
for key, value in metrics_before.items():
print(f" {key} : {value}")
# Sauvegarder le JSON de progression comme preuve
with open("proof/query_progress_before.json", "w") as f:
json.dump(progress, f, indent=2, default=str)
print("\nJSON de progression sauvegardé à proof/query_progress_before.json")
# Sauvegarder le plan de streaming en capturant stdout de explain()
string_buffer = io.StringIO()
old_stdout = sys.stdout
sys.stdout = string_buffer
windowed.explain("formatted")
sys.stdout = old_stdout
plan_output = string_buffer.getvalue()
with open("proof/plan_streaming_before.txt", "w") as f:
f.write(plan_output)
print("Plan de streaming sauvegardé à proof/plan_streaming_before.txt")
Progression de la requête (dernier micro-batch) :
{
"id": "68628729-0ab5-4dda-a826-294b8c1bf980",
"runId": "2449abc1-b30a-48bf-a874-5c2334dc529e",
"name": null,
"timestamp": "2026-04-29T09:31:50.000Z",
"batchId": 0,
"batchDuration": 15,
"durationMs": {
"triggerExecution": 15,
"latestOffset": 14
},
"eventTime": {},
"stateOperators": [],
"sources": [
{
"description": "FileStreamSource[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab1 practice/data/stream_input]",
"startOffset": "None",
"endOffset": "None",
"latestOffset": "None",
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0,
"metrics": {}
}
],
"sink": {
"description": "FileSink[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab1 practice/outputs/lab1/stream_sink]",
"numOutputRows": -1,
"metrics": {}
},
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0,
"observedMetrics": {}
}
Métriques clés (Avant optimisation) :
run_id : avant_optimisation
trigger_interval_sec : 10
watermark_duration_min : 15
num_output_rows : 0
input_rows_per_second : 0.0
processed_rows_per_second : 0.0
batch_duration_ms : 15
total_delay_ms : 0
timestamp : 2026-04-29T09:31:50.000Z
JSON de progression sauvegardé à proof/query_progress_before.json
Plan de streaming sauvegardé à proof/plan_streaming_before.txt
6. Optimisation avant/après¶
On ré-exécute le pipeline avec des paramètres modifiés pour mesurer le compromis :
Avant (Référence) :
- Déclencheur : 10 secondes (micro-batching agressif)
- Watermark : 15 minutes (tolérance lenient aux données tardives)
- Coût : Plus de micro-batches = plus d'overhead d'ordonnancement, latence plus basse
Après (Optimisé) :
- Déclencheur : 30 secondes (consolidation de micro-batches)
- Watermark : 10 minutes (nettoyage d'état plus agressif, moins de mémoire)
- Objectif : Débit plus élevé (batches plus grands), augmentation de latence acceptable
On compare :
inputRowsPerSecondetprocessedRowsPerSecond(débit)batchDuration(coût de traitement par événement)- Overhead de mémoire d'état (de la réduction du watermark)
Les résultats sont enregistrés dans lab1_metrics_log.csv pour analyse et reporting.
# Optimisation : augmenter l'intervalle de déclenchement pour réduire l'overhead et améliorer le débit
# Stratégie : traiter les micro-batches toutes les 30 secondes au lieu de 10 secondes
# Compromis attendu : latence plus basse mais débit potentiellement plus élevé (moins de petits batches)
# Reconstruire la requête de streaming avec déclencheur optimisé
sink_path_opt = "outputs/lab1/stream_sink_optimized"
checkpoint_path_opt = "outputs/lab1/checkpoint_optimized"
pathlib.Path(sink_path_opt).mkdir(parents=True, exist_ok=True)
pathlib.Path(checkpoint_path_opt).mkdir(parents=True, exist_ok=True)
# Recréer l'agrégation fenêtrée avec watermark modifié pour contrainte plus stricte
# (watermark plus court = nettoyage d'état plus rapide, mais suppression de données tardives plus agressive)
windowed_opt = (df_stream
.withWatermark("match_end_time", "10 minutes") # Watermark plus strict
.groupBy(
F.window("match_end_time", "1 hour"),
F.col("team_id")
)
.agg(
F.count("*").alias("num_matches"),
F.sum("kills").alias("total_kills"),
F.sum("deaths").alias("total_deaths"),
F.avg("kills").alias("avg_kills_per_match"),
F.avg("match_duration_sec").alias("avg_match_duration_sec")
)
.select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"team_id",
"num_matches",
"total_kills",
"total_deaths",
"avg_kills_per_match",
"avg_match_duration_sec"
))
# Exécuter la requête optimisée avec intervalle de déclenchement plus long
query_opt = (windowed_opt.writeStream
.format("parquet")
.outputMode("append")
.option("path", sink_path_opt)
.option("checkpointLocation", checkpoint_path_opt)
.trigger(processingTime="30 seconds")
.start())
print("Requête de streaming optimisée démarrée avec déclencheur=30s, watermark=10min")
query_opt.awaitTermination(timeout=60)
query_opt.stop()
print("Requête optimisée arrêtée après 60 secondes.")
# Capturer les métriques optimisées
progress_opt = query_opt.lastProgress
metrics_after = {
"run_id": "apres_optimisation",
"trigger_interval_sec": 30,
"watermark_duration_min": 10,
"num_output_rows": progress_opt.get("numOutputRows", 0),
"input_rows_per_second": progress_opt.get("inputRowsPerSecond", 0),
"processed_rows_per_second": progress_opt.get("processedRowsPerSecond", 0),
"batch_duration_ms": progress_opt.get("batchDuration", 0),
"total_delay_ms": progress_opt.get("totalDelay", 0),
"timestamp": progress_opt.get("timestamp")
}
print("\nMétriques clés (Après optimisation) :")
for key, value in metrics_after.items():
print(f" {key} : {value}")
# Sauvegarder le JSON de progression optimisé
with open("proof/query_progress_after.json", "w") as f:
json.dump(progress_opt, f, indent=2, default=str)
# Sauvegarder le plan optimisé en capturant stdout de explain()
string_buffer_opt = io.StringIO()
old_stdout_opt = sys.stdout
sys.stdout = string_buffer_opt
windowed_opt.explain("formatted")
sys.stdout = old_stdout_opt
plan_opt = string_buffer_opt.getvalue()
with open("proof/plan_streaming_after.txt", "w") as f:
f.write(plan_opt)
# Créer un journal des métriques comparant avant et après
import csv
import datetime
metrics_log = [
{
"run_id": "avant_optimisation",
"trigger_interval_sec": 10,
"watermark_duration_min": 15,
"num_output_rows": metrics_before["num_output_rows"],
"input_rows_per_second": metrics_before["input_rows_per_second"],
"processed_rows_per_second": metrics_before["processed_rows_per_second"],
"batch_duration_ms": metrics_before["batch_duration_ms"],
"total_delay_ms": metrics_before["total_delay_ms"],
"timestamp": datetime.datetime.now().isoformat()
},
{
"run_id": "apres_optimisation",
"trigger_interval_sec": 30,
"watermark_duration_min": 10,
"num_output_rows": metrics_after["num_output_rows"],
"input_rows_per_second": metrics_after["input_rows_per_second"],
"processed_rows_per_second": metrics_after["processed_rows_per_second"],
"batch_duration_ms": metrics_after["batch_duration_ms"],
"total_delay_ms": metrics_after["total_delay_ms"],
"timestamp": datetime.datetime.now().isoformat()
}
]
log_path = "lab1_metrics_log.csv"
with open(log_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=metrics_log[0].keys())
writer.writeheader()
writer.writerows(metrics_log)
print(f"\nJournal des métriques sauvegardé à {log_path}")
print("Comparaison d'optimisation complétée.")
spark.stop()
print("\nLab 1 terminé. Vérifier le dossier proof/ pour les preuves et outputs/ pour les données du sink Parquet.")
26/04/29 11:31:59 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Requête de streaming optimisée démarrée avec déclencheur=30s, watermark=10min
26/04/29 11:32:59 WARN DAGScheduler: Failed to cancel job group e0a29307-2dee-4afc-9868-c26268bf8c8f. Cannot find active jobs for it. 26/04/29 11:32:59 WARN DAGScheduler: Failed to cancel job group e0a29307-2dee-4afc-9868-c26268bf8c8f. Cannot find active jobs for it.
Requête optimisée arrêtée après 60 secondes. Métriques clés (Après optimisation) : run_id : apres_optimisation trigger_interval_sec : 30 watermark_duration_min : 10 num_output_rows : 0 input_rows_per_second : 0.0 processed_rows_per_second : 0.0 batch_duration_ms : 16 total_delay_ms : 0 timestamp : 2026-04-29T09:32:30.000Z Journal des métriques sauvegardé à lab1_metrics_log.csv Comparaison d'optimisation complétée. Lab 1 terminé. Vérifier le dossier proof/ pour les preuves et outputs/ pour les données du sink Parquet.