lab1 practice - Preuves

Captures et plans d’execution generes lors du lab.

Captures d’ecran

metric1

metric1

metric2

metric2

Plans et logs

plan_streaming_after.txt

== Physical Plan ==
* HashAggregate (11)
+- StateStoreSave (10)
   +- * HashAggregate (9)
      +- StateStoreRestore (8)
         +- * HashAggregate (7)
            +- Exchange (6)
               +- * HashAggregate (5)
                  +- * Project (4)
                     +- * Filter (3)
                        +- EventTimeWatermark (2)
                           +- StreamingRelation (1)
 
 
(1) StreamingRelation
Output [5]: [match_end_time#0, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Arguments: FileSource[data/stream_input/], [match_end_time#0, team_id#1, kills#2, deaths#3, match_duration_sec#4]
 
(2) EventTimeWatermark
Input [5]: [match_end_time#0, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Arguments: 13f9c107-0499-4111-9e95-3b545d8f95cf, match_end_time#0: timestamp, 10 minutes
 
(3) Filter [codegen id : 1]
Input [5]: [match_end_time#0-T600000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Condition : isnotnull(match_end_time#0-T600000ms)
 
(4) Project [codegen id : 1]
Output [5]: [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - 0) % 3600000000) < 0) THEN (((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - 0) % 3600000000) + 3600000000) ELSE ((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - 0) % 3600000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - 0) % 3600000000) < 0) THEN (((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - 0) % 3600000000) + 3600000000) ELSE ((precisetimestampconversion(match_end_time#0-T600000ms, TimestampType, LongType) - 0) % 3600000000) END) - 0) + 3600000000), LongType, TimestampType))) AS window#71-T600000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Input [5]: [match_end_time#0-T600000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
 
(5) HashAggregate [codegen id : 1]
Input [5]: [window#71-T600000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Keys [2]: [window#71-T600000ms, team_id#1]
Functions [5]: [partial_count(1), partial_sum(kills#2), partial_sum(deaths#3), partial_avg(kills#2), partial_avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#66L, sum(kills#2)#67L, sum(deaths#3)#68L, avg(kills#2)#69, avg(match_duration_sec#4)#70]
Results [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
 
(6) Exchange
Input [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
Arguments: hashpartitioning(window#71-T600000ms, team_id#1, 200), REQUIRED_BY_STATEFUL_OPERATOR, [plan_id=143]
 
(7) HashAggregate [codegen id : 2]
Input [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
Keys [2]: [window#71-T600000ms, team_id#1]
Functions [5]: [merge_count(1), merge_sum(kills#2), merge_sum(deaths#3), merge_avg(kills#2), merge_avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#66L, sum(kills#2)#67L, sum(deaths#3)#68L, avg(kills#2)#69, avg(match_duration_sec#4)#70]
Results [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
 
(8) StateStoreRestore
Input [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
Arguments: [window#71-T600000ms, team_id#1], state info [ checkpoint = <unknown>, runId = c2b678a6-8bd3-452f-b3fc-0ef71efda9a0, opId = 0, ver = 0, numPartitions = 200] stateStoreCkptIds = None, 2
 
(9) HashAggregate [codegen id : 3]
Input [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
Keys [2]: [window#71-T600000ms, team_id#1]
Functions [5]: [merge_count(1), merge_sum(kills#2), merge_sum(deaths#3), merge_avg(kills#2), merge_avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#66L, sum(kills#2)#67L, sum(deaths#3)#68L, avg(kills#2)#69, avg(match_duration_sec#4)#70]
Results [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
 
(10) StateStoreSave
Input [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
Arguments: [window#71-T600000ms, team_id#1], state info [ checkpoint = <unknown>, runId = c2b678a6-8bd3-452f-b3fc-0ef71efda9a0, opId = 0, ver = 0, numPartitions = 200] stateStoreCkptIds = None, Append, -9223372036854775808, -9223372036854775808, 2
 
(11) HashAggregate [codegen id : 4]
Input [9]: [window#71-T600000ms, team_id#1, count#87L, sum#89L, sum#91L, sum#94, count#95L, sum#98, count#99L]
Keys [2]: [window#71-T600000ms, team_id#1]
Functions [5]: [count(1), sum(kills#2), sum(deaths#3), avg(kills#2), avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#66L, sum(kills#2)#67L, sum(deaths#3)#68L, avg(kills#2)#69, avg(match_duration_sec#4)#70]
Results [8]: [window#71-T600000ms.start AS window_start#72, window#71-T600000ms.end AS window_end#73, team_id#1, count(1)#66L AS num_matches#56L, sum(kills#2)#67L AS total_kills#57L, sum(deaths#3)#68L AS total_deaths#58L, avg(kills#2)#69 AS avg_kills_per_match#59, avg(match_duration_sec#4)#70 AS avg_match_duration_sec#60]
 
 
 

plan_streaming_before.txt

== Physical Plan ==
* HashAggregate (11)
+- StateStoreSave (10)
   +- * HashAggregate (9)
      +- StateStoreRestore (8)
         +- * HashAggregate (7)
            +- Exchange (6)
               +- * HashAggregate (5)
                  +- * Project (4)
                     +- * Filter (3)
                        +- EventTimeWatermark (2)
                           +- StreamingRelation (1)
 
 
(1) StreamingRelation
Output [5]: [match_end_time#0, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Arguments: FileSource[data/stream_input/], [match_end_time#0, team_id#1, kills#2, deaths#3, match_duration_sec#4]
 
(2) EventTimeWatermark
Input [5]: [match_end_time#0, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Arguments: ce89a23b-6ec4-485b-9416-7a0a390999a1, match_end_time#0: timestamp, 15 minutes
 
(3) Filter [codegen id : 1]
Input [5]: [match_end_time#0-T900000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Condition : isnotnull(match_end_time#0-T900000ms)
 
(4) Project [codegen id : 1]
Output [5]: [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - 0) % 3600000000) < 0) THEN (((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - 0) % 3600000000) + 3600000000) ELSE ((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - 0) % 3600000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - 0) % 3600000000) < 0) THEN (((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - 0) % 3600000000) + 3600000000) ELSE ((precisetimestampconversion(match_end_time#0-T900000ms, TimestampType, LongType) - 0) % 3600000000) END) - 0) + 3600000000), LongType, TimestampType))) AS window#21-T900000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Input [5]: [match_end_time#0-T900000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
 
(5) HashAggregate [codegen id : 1]
Input [5]: [window#21-T900000ms, team_id#1, kills#2, deaths#3, match_duration_sec#4]
Keys [2]: [window#21-T900000ms, team_id#1]
Functions [5]: [partial_count(1), partial_sum(kills#2), partial_sum(deaths#3), partial_avg(kills#2), partial_avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#16L, sum(kills#2)#17L, sum(deaths#3)#18L, avg(kills#2)#19, avg(match_duration_sec#4)#20]
Results [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
 
(6) Exchange
Input [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
Arguments: hashpartitioning(window#21-T900000ms, team_id#1, 200), REQUIRED_BY_STATEFUL_OPERATOR, [plan_id=59]
 
(7) HashAggregate [codegen id : 2]
Input [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
Keys [2]: [window#21-T900000ms, team_id#1]
Functions [5]: [merge_count(1), merge_sum(kills#2), merge_sum(deaths#3), merge_avg(kills#2), merge_avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#16L, sum(kills#2)#17L, sum(deaths#3)#18L, avg(kills#2)#19, avg(match_duration_sec#4)#20]
Results [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
 
(8) StateStoreRestore
Input [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
Arguments: [window#21-T900000ms, team_id#1], state info [ checkpoint = <unknown>, runId = 2ba24a41-d346-4f0a-9087-b5318b5cbf7e, opId = 0, ver = 0, numPartitions = 200] stateStoreCkptIds = None, 2
 
(9) HashAggregate [codegen id : 3]
Input [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
Keys [2]: [window#21-T900000ms, team_id#1]
Functions [5]: [merge_count(1), merge_sum(kills#2), merge_sum(deaths#3), merge_avg(kills#2), merge_avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#16L, sum(kills#2)#17L, sum(deaths#3)#18L, avg(kills#2)#19, avg(match_duration_sec#4)#20]
Results [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
 
(10) StateStoreSave
Input [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
Arguments: [window#21-T900000ms, team_id#1], state info [ checkpoint = <unknown>, runId = 2ba24a41-d346-4f0a-9087-b5318b5cbf7e, opId = 0, ver = 0, numPartitions = 200] stateStoreCkptIds = None, Append, -9223372036854775808, -9223372036854775808, 2
 
(11) HashAggregate [codegen id : 4]
Input [9]: [window#21-T900000ms, team_id#1, count#37L, sum#39L, sum#41L, sum#44, count#45L, sum#48, count#49L]
Keys [2]: [window#21-T900000ms, team_id#1]
Functions [5]: [count(1), sum(kills#2), sum(deaths#3), avg(kills#2), avg(match_duration_sec#4)]
Aggregate Attributes [5]: [count(1)#16L, sum(kills#2)#17L, sum(deaths#3)#18L, avg(kills#2)#19, avg(match_duration_sec#4)#20]
Results [8]: [window#21-T900000ms.start AS window_start#22, window#21-T900000ms.end AS window_end#23, team_id#1, count(1)#16L AS num_matches#6L, sum(kills#2)#17L AS total_kills#7L, sum(deaths#3)#18L AS total_deaths#8L, avg(kills#2)#19 AS avg_kills_per_match#9, avg(match_duration_sec#4)#20 AS avg_match_duration_sec#10]
 
 
 

Donnees brutes

query_progress_after.json

Telecharger query_progress_after.json

{
  "id": "1923b44f-6668-432a-a9b0-e3c84f61c6d2",
  "runId": "e0a29307-2dee-4afc-9868-c26268bf8c8f",
  "name": null,
  "timestamp": "2026-04-29T09:32:30.000Z",
  "batchId": 0,
  "batchDuration": 16,
  "durationMs": {
    "triggerExecution": 15,
    "latestOffset": 15
  },
  "eventTime": {},
  "stateOperators": [],
  "sources": [
    {
      "description": "FileStreamSource[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab1 practice/data/stream_input]",
      "startOffset": "None",
      "endOffset": "None",
      "latestOffset": "None",
      "numInputRows": 0,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 0.0,
      "metrics": {}
    }
  ],
  "sink": {
    "description": "FileSink[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab1 practice/outputs/lab1/stream_sink_optimized]",
    "numOutputRows": -1,
    "metrics": {}
  },
  "numInputRows": 0,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 0.0,
  "observedMetrics": {}
}

query_progress_before.json

Telecharger query_progress_before.json

{
  "id": "68628729-0ab5-4dda-a826-294b8c1bf980",
  "runId": "2449abc1-b30a-48bf-a874-5c2334dc529e",
  "name": null,
  "timestamp": "2026-04-29T09:31:50.000Z",
  "batchId": 0,
  "batchDuration": 15,
  "durationMs": {
    "triggerExecution": 15,
    "latestOffset": 14
  },
  "eventTime": {},
  "stateOperators": [],
  "sources": [
    {
      "description": "FileStreamSource[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab1 practice/data/stream_input]",
      "startOffset": "None",
      "endOffset": "None",
      "latestOffset": "None",
      "numInputRows": 0,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 0.0,
      "metrics": {}
    }
  ],
  "sink": {
    "description": "FileSink[file:/home/sable/Documents/E4FD/S4/Data Engineering/Data Engineering 2/lab1 practice/outputs/lab1/stream_sink]",
    "numOutputRows": -1,
    "metrics": {}
  },
  "numInputRows": 0,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 0.0,
  "observedMetrics": {}
}

0 éléments sous ce dossier.