Assignment 2: Data Engineering with PySpark¶

Students : DIALLO Samba & DIOP Mouhamed


Overview¶

This assignment focuses on building a data engineering pipeline using PySpark to process and analyze e-commerce operational data from SQLite.

Objectives¶

  • Set up PySpark environment
  • Load and explore data from SQLite database
  • Perform data transformations
  • Analyze sales and customer behavior patterns
  • Export processed data

Part 1: Environment Setup¶

In [1]:
# Install required packages
!pip install pyspark pandas numpy matplotlib seaborn
Requirement already satisfied: pyspark in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (4.0.1)
Requirement already satisfied: pandas in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (2.3.3)
Requirement already satisfied: numpy in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (2.2.6)
Requirement already satisfied: matplotlib in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (3.10.8)
Requirement already satisfied: seaborn in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (0.13.2)
Requirement already satisfied: py4j==0.10.9.9 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pyspark) (0.10.9.9)
Requirement already satisfied: python-dateutil>=2.8.2 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pandas) (2.9.0.post0)
Requirement already satisfied: pytz>=2020.1 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pandas) (2025.2)
Requirement already satisfied: tzdata>=2022.7 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from pandas) (2025.2)
Requirement already satisfied: contourpy>=1.0.1 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (1.3.2)
Requirement already satisfied: cycler>=0.10 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (0.12.1)
Requirement already satisfied: fonttools>=4.22.0 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (4.60.1)
Requirement already satisfied: kiwisolver>=1.3.1 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (1.4.9)
Requirement already satisfied: packaging>=20.0 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (25.0)
Requirement already satisfied: pillow>=8 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (12.0.0)
Requirement already satisfied: pyparsing>=3 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from matplotlib) (3.2.5)
Requirement already satisfied: six>=1.5 in /home/sable/miniconda3/envs/de1-env/lib/python3.10/site-packages (from python-dateutil>=2.8.2->pandas) (1.17.0)
In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

print("Libraries imported successfully")
Libraries imported successfully
In [3]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Assignment2_DataEngineering") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("Spark session created successfully")
print(f"Spark version: {spark.version}")
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/05 02:01:56 WARN Utils: Your hostname, sable-ThinkPad-X1-Yoga-3rd, resolves to a loopback address: 127.0.1.1; using 10.192.33.105 instead (on interface wlp2s0)
26/01/05 02:01:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/05 02:01:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark session created successfully
Spark version: 4.0.1

Part 2: Data Loading and Exploration¶

In [4]:
# Create SQLite database from SQL dump
import sqlite3
import os

# Path to SQL file
sql_file = "/home/sable/Badr TAJINI/lab2-practice/lab2_operational_dump.sql"
db_file = "/home/sable/Badr TAJINI/lab2-practice/operational.db"

# Remove old database if exists
if os.path.exists(db_file):
    os.remove(db_file)

# Create database and load data
conn = sqlite3.connect(db_file)
cursor = conn.cursor()

# Read and execute SQL file
with open(sql_file, 'r') as f:
    sql_script = f.read()
    cursor.executescript(sql_script)

conn.commit()
conn.close()

print("Database created and data loaded successfully")
Database created and data loaded successfully
In [6]:
# Load data from SQLite database using pandas then convert to Spark
print("Loading data from database...")

# Connect to SQLite
db_path = "/home/sable/Badr TAJINI/lab2-practice/operational.db"
conn_read = sqlite3.connect(db_path)

# Load tables into pandas DataFrames first
customers_pd = pd.read_sql_query("SELECT * FROM customers", conn_read)
orders_pd = pd.read_sql_query("SELECT * FROM orders", conn_read)
order_items_pd = pd.read_sql_query("SELECT * FROM order_items", conn_read)
products_pd = pd.read_sql_query("SELECT * FROM products", conn_read)
brands_pd = pd.read_sql_query("SELECT * FROM brands", conn_read)
categories_pd = pd.read_sql_query("SELECT * FROM categories", conn_read)

conn_read.close()

# Convert pandas DataFrames to Spark DataFrames
customers_df = spark.createDataFrame(customers_pd)
orders_df = spark.createDataFrame(orders_pd)
order_items_df = spark.createDataFrame(order_items_pd)
products_df = spark.createDataFrame(products_pd)
brands_df = spark.createDataFrame(brands_pd)
categories_df = spark.createDataFrame(categories_pd)

print("Data loaded successfully")
print(f"Customers: {customers_df.count()}")
print(f"Orders: {orders_df.count()}")
print(f"Order items: {order_items_df.count()}")
Loading data from database...
Data loaded successfully
                                                                                
Customers: 24
Orders: 220
Order items: 638
In [7]:
# Display sample data from each table
print("Sample customers:")
customers_df.show(5)

print("\nSample orders:")
orders_df.show(5)

print("\nSample products:")
products_df.show(5)
Sample customers:
+-----------+----------+----------------+-------------------+
|customer_id|      name|           email|         created_at|
+-----------+----------+----------------+-------------------+
|          1|Customer 1|c001@example.com|2025-01-02T10:00:00|
|          2|Customer 2|c002@example.com|2025-01-03T10:00:00|
|          3|Customer 3|c003@example.com|2025-01-04T10:00:00|
|          4|Customer 4|c004@example.com|2025-01-05T10:00:00|
|          5|Customer 5|c005@example.com|2025-01-06T10:00:00|
+-----------+----------+----------------+-------------------+
only showing top 5 rows

Sample orders:
+--------+-----------+-------------------+
|order_id|customer_id|         order_date|
+--------+-----------+-------------------+
|       1|          5|2025-03-07T12:22:00|
|       2|         14|2025-02-16T17:00:00|
|       3|         21|2025-04-11T12:28:00|
|       4|          6|2025-06-15T17:13:00|
|       5|         17|2025-05-22T16:29:00|
+--------+-----------+-------------------+
only showing top 5 rows

Sample products:
+----------+------------+--------+-----------+------+
|product_id|product_name|brand_id|category_id| price|
+----------+------------+--------+-----------+------+
|         1|   Product 1|       5|          8| 97.32|
|         2|   Product 2|       3|          9|212.25|
|         3|   Product 3|       8|          7| 140.3|
|         4|   Product 4|       5|          1| 67.12|
|         5|   Product 5|       8|          3|160.07|
+----------+------------+--------+-----------+------+
only showing top 5 rows
In [8]:
# Display schemas
print("Customers schema:")
customers_df.printSchema()

print("\nOrders schema:")
orders_df.printSchema()

print("\nProducts schema:")
products_df.printSchema()
Customers schema:
root
 |-- customer_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- created_at: string (nullable = true)


Orders schema:
root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)


Products schema:
root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- price: double (nullable = true)

In [9]:
# Basic statistics
print("Customers statistics:")
customers_df.describe().show()

print("\nOrders statistics:")
orders_df.describe().show()
Customers statistics:
26/01/05 02:04:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
+-------+------------------+----------+----------------+-------------------+
|summary|       customer_id|      name|           email|         created_at|
+-------+------------------+----------+----------------+-------------------+
|  count|                24|        24|              24|                 24|
|   mean|              12.5|      NULL|            NULL|               NULL|
| stddev|7.0710678118654755|      NULL|            NULL|               NULL|
|    min|                 1|Customer 1|c001@example.com|2025-01-02T10:00:00|
|    max|                24|Customer 9|c024@example.com|2025-01-25T10:00:00|
+-------+------------------+----------+----------------+-------------------+


Orders statistics:
+-------+-----------------+------------------+-------------------+
|summary|         order_id|       customer_id|         order_date|
+-------+-----------------+------------------+-------------------+
|  count|              220|               220|                220|
|   mean|            110.5|12.522727272727273|               NULL|
| stddev|63.65270352991039| 6.612181357139973|               NULL|
|    min|                1|                 1|2025-02-01T12:04:00|
|    max|              220|                24|2025-06-28T19:23:00|
+-------+-----------------+------------------+-------------------+

Part 3: Data Quality Checks¶

In [11]:
# Check for null values in each table
print("Checking for null values in customers:")
customers_df.select([count(when(col(c).isNull(), c)).alias(c) for c in customers_df.columns]).show()

print("\nChecking for null values in orders:")
orders_df.select([count(when(col(c).isNull(), c)).alias(c) for c in orders_df.columns]).show()

print("\nChecking for null values in products:")
products_df.select([count(when(col(c).isNull(), c)).alias(c) for c in products_df.columns]).show()
Checking for null values in customers:
+-----------+----+-----+----------+
|customer_id|name|email|created_at|
+-----------+----+-----+----------+
|          0|   0|    0|         0|
+-----------+----+-----+----------+


Checking for null values in orders:
+--------+-----------+----------+
|order_id|customer_id|order_date|
+--------+-----------+----------+
|       0|          0|         0|
+--------+-----------+----------+


Checking for null values in products:
+----------+------------+--------+-----------+-----+
|product_id|product_name|brand_id|category_id|price|
+----------+------------+--------+-----------+-----+
|         0|           0|       0|          0|    0|
+----------+------------+--------+-----------+-----+

In [12]:
# Check for duplicate records
print("Checking for duplicates in customers...")
total = customers_df.count()
distinct = customers_df.distinct().count()
print(f"Customers - Total: {total}, Distinct: {distinct}, Duplicates: {total - distinct}")

print("\nChecking for duplicates in orders...")
total = orders_df.count()
distinct = orders_df.distinct().count()
print(f"Orders - Total: {total}, Distinct: {distinct}, Duplicates: {total - distinct}")
Checking for duplicates in customers...
Customers - Total: 24, Distinct: 24, Duplicates: 0

Checking for duplicates in orders...
Orders - Total: 220, Distinct: 220, Duplicates: 0

Part 4: Data Transformations¶

In [13]:
# Create a complete sales dataset by joining tables
print("Creating complete sales dataset...")
sales_df = order_items_df \
    .join(orders_df, "order_id") \
    .join(customers_df, "customer_id") \
    .join(products_df, "product_id") \
    .join(brands_df, "brand_id") \
    .join(categories_df, "category_id")

print("Sales dataset created")
print(f"Total sales records: {sales_df.count()}")
sales_df.show(5)
Creating complete sales dataset...
Sales dataset created
                                                                                
Total sales records: 638
                                                                                
+-----------+--------+----------+-----------+--------+-------------+--------+----------+-------------------+-----------+----------------+-------------------+------------+-----+----------+-------------+
|category_id|brand_id|product_id|customer_id|order_id|order_item_id|quantity|unit_price|         order_date|       name|           email|         created_at|product_name|price|brand_name|category_name|
+-----------+--------+----------+-----------+--------+-------------+--------+----------+-------------------+-----------+----------------+-------------------+------------+-----+----------+-------------+
|          1|       2|        28|          1|     116|          354|       4|     68.03|2025-04-23T15:51:00| Customer 1|c001@example.com|2025-01-02T10:00:00|  Product 28|68.03|   Brand_2|   Category_1|
|          1|       2|        28|          6|       4|           10|       1|     68.03|2025-06-15T17:13:00| Customer 6|c006@example.com|2025-01-07T10:00:00|  Product 28|68.03|   Brand_2|   Category_1|
|          1|       2|        28|          8|      36|          113|       4|     68.03|2025-03-23T08:08:00| Customer 8|c008@example.com|2025-01-09T10:00:00|  Product 28|68.03|   Brand_2|   Category_1|
|          1|       2|        28|         10|     152|          455|       3|     68.03|2025-02-12T18:25:00|Customer 10|c010@example.com|2025-01-11T10:00:00|  Product 28|68.03|   Brand_2|   Category_1|
|          1|       2|        28|         10|     164|          490|       1|     68.03|2025-03-01T16:27:00|Customer 10|c010@example.com|2025-01-11T10:00:00|  Product 28|68.03|   Brand_2|   Category_1|
+-----------+--------+----------+-----------+--------+-------------+--------+----------+-------------------+-----------+----------------+-------------------+------------+-----+----------+-------------+
only showing top 5 rows
In [14]:
# Add calculated columns
print("Adding calculated columns...")
sales_df = sales_df.withColumn("total_price", col("quantity") * col("unit_price")) \
    .withColumn("order_month", month(col("order_date"))) \
    .withColumn("order_year", year(col("order_date")))

print("Calculated columns added")
sales_df.select("order_id", "product_name", "quantity", "unit_price", "total_price").show(5)
Adding calculated columns...
Calculated columns added
                                                                                
+--------+------------+--------+----------+-----------+
|order_id|product_name|quantity|unit_price|total_price|
+--------+------------+--------+----------+-----------+
|      86|  Product 49|       4|     24.21|      96.84|
|      49|  Product 49|       4|     24.21|      96.84|
|      59|  Product 49|       1|     24.21|      24.21|
|      38|  Product 49|       1|     24.21|      24.21|
|     166|  Product 49|       3|     24.21|      72.63|
+--------+------------+--------+----------+-----------+
only showing top 5 rows

Part 5: Data Analysis¶

In [16]:
# Customer analysis
print("Analyzing customer behavior...")
customer_stats = sales_df.groupBy("customer_id", "name").agg(
    count("order_id").alias("total_orders"),
    sum("total_price").alias("total_spent"),
    avg("total_price").alias("avg_order_value"),
    count_distinct("product_id").alias("unique_products")
).orderBy(col("total_spent").desc())

print("Top 10 customers by spending:")
customer_stats.show(10)
Analyzing customer behavior...
Top 10 customers by spending:
                                                                                
+-----------+-----------+------------+------------------+------------------+---------------+
|customer_id|       name|total_orders|       total_spent|   avg_order_value|unique_products|
+-----------+-----------+------------+------------------+------------------+---------------+
|         11|Customer 11|          43|          12231.63|284.45651162790693|             33|
|         16|Customer 16|          34|11364.690000000004| 334.2555882352942|             28|
|         13|Customer 13|          37|          11359.48|  307.012972972973|             29|
|         18|Customer 18|          30|          11242.21| 374.7403333333333|             27|
|         21|Customer 21|          28|10338.589999999998| 369.2353571428571|             21|
|         19|Customer 19|          34|10271.220000000003|  302.094705882353|             28|
|         23|Customer 23|          30|10215.859999999999|340.52866666666665|             25|
|          2| Customer 2|          32|           9991.58|        312.236875|             24|
|         15|Customer 15|          31|           9630.13|310.64935483870966|             23|
|          6| Customer 6|          27| 9359.039999999999|346.63111111111107|             24|
+-----------+-----------+------------+------------------+------------------+---------------+
only showing top 10 rows
In [17]:
# Product analysis
print("Analyzing product performance...")
product_stats = sales_df.groupBy("product_id", "product_name").agg(
    sum("quantity").alias("total_quantity_sold"),
    sum("total_price").alias("total_revenue"),
    count("order_id").alias("number_of_orders")
).orderBy(col("total_revenue").desc())

print("Top products by revenue:")
product_stats.show(10)
Analyzing product performance...
Top products by revenue:
                                                                                
+----------+------------+-------------------+-----------------+----------------+
|product_id|product_name|total_quantity_sold|    total_revenue|number_of_orders|
+----------+------------+-------------------+-----------------+----------------+
|        51|  Product 51|                 43|         10018.57|              18|
|        30|  Product 30|                 40|           8352.0|              16|
|        29|  Product 29|                 32|7828.160000000001|              14|
|        54|  Product 54|                 32|7295.039999999999|              12|
|         6|   Product 6|                 28|6824.439999999999|              12|
|        14|  Product 14|                 38|6658.739999999999|              15|
|        59|  Product 59|                 45|           6584.4|              17|
|        55|  Product 55|                 27|          6430.05|              11|
|        31|  Product 31|                 29|          6295.32|               9|
|        41|  Product 41|                 35|           5973.8|              14|
+----------+------------+-------------------+-----------------+----------------+
only showing top 10 rows
In [18]:
# Brand analysis
print("Analyzing brand performance...")
brand_stats = sales_df.groupBy("brand_id", "brand_name").agg(
    sum("total_price").alias("total_revenue"),
    count("order_id").alias("number_of_orders"),
    count_distinct("customer_id").alias("unique_customers")
).orderBy(col("total_revenue").desc())

print("Brand statistics:")
brand_stats.show()
Analyzing brand performance...
Brand statistics:
                                                                                
+--------+----------+------------------+----------------+----------------+
|brand_id|brand_name|     total_revenue|number_of_orders|unique_customers|
+--------+----------+------------------+----------------+----------------+
|       5|   Brand_5| 46077.27000000001|             128|              23|
|       4|   Brand_4|32344.849999999995|              82|              23|
|       1|   Brand_1|26013.300000000003|              84|              22|
|       3|   Brand_3|25833.960000000006|              99|              24|
|       7|   Brand_7|          18545.43|              61|              21|
|       8|   Brand_8|          17906.33|              66|              23|
|       2|   Brand_2|          15213.68|              69|              22|
|       6|   Brand_6|          14668.06|              49|              20|
+--------+----------+------------------+----------------+----------------+

In [19]:
# Category analysis
print("Analyzing category performance...")
category_stats = sales_df.groupBy("category_id", "category_name").agg(
    sum("total_price").alias("total_revenue"),
    count("order_id").alias("number_of_orders"),
    avg("total_price").alias("avg_order_value")
).orderBy(col("total_revenue").desc())

print("Category statistics:")
category_stats.show()
Analyzing category performance...
Category statistics:
                                                                                
+-----------+-------------+------------------+----------------+------------------+
|category_id|category_name|     total_revenue|number_of_orders|   avg_order_value|
+-----------+-------------+------------------+----------------+------------------+
|          9|   Category_9|36032.189999999966|             102|  353.256764705882|
|          3|   Category_3|           33467.4|              78| 429.0692307692308|
|          7|   Category_7| 30059.80000000001|              81| 371.1086419753088|
|          2|   Category_2|30016.330000000013|             105|285.86980952380964|
|          4|   Category_4|17664.019999999997|              71|  248.789014084507|
|          6|   Category_6|15339.340000000007|              68| 225.5785294117648|
|          5|   Category_5|          12637.34|              40|          315.9335|
|          1|   Category_1|12501.630000000003|              50|250.03260000000006|
|          8|   Category_8| 8884.829999999994|              43|206.62395348837197|
+-----------+-------------+------------------+----------------+------------------+

In [20]:
# Monthly sales trend
print("Analyzing monthly sales trend...")
monthly_stats = sales_df.groupBy("order_year", "order_month").agg(
    sum("total_price").alias("total_revenue"),
    count("order_id").alias("number_of_orders")
).orderBy("order_year", "order_month")

print("Monthly sales:")
monthly_stats.show()
Analyzing monthly sales trend...
Monthly sales:
                                                                                
+----------+-----------+------------------+----------------+
|order_year|order_month|     total_revenue|number_of_orders|
+----------+-----------+------------------+----------------+
|      2025|          2|38662.149999999994|             126|
|      2025|          3| 40668.49999999999|             127|
|      2025|          4| 39058.24000000003|             133|
|      2025|          5| 42940.88000000004|             143|
|      2025|          6|          35273.11|             109|
+----------+-----------+------------------+----------------+

Part 6: Advanced Analytics¶

In [21]:
# Customer segmentation by spending
print("Customer segmentation by spending...")
customer_segments = customer_stats.withColumn(
    "segment",
    when(col("total_spent") >= 1000, "High Value")
    .when((col("total_spent") >= 500) & (col("total_spent") < 1000), "Medium Value")
    .otherwise("Low Value")
)

print("Customer segments:")
customer_segments.groupBy("segment").count().orderBy("segment").show()
Customer segmentation by spending...
Customer segments:
                                                                                
+----------+-----+
|   segment|count|
+----------+-----+
|High Value|   24|
+----------+-----+

In [22]:
# Average basket size per customer
print("Calculating average basket size...")
basket_analysis = sales_df.groupBy("order_id").agg(
    sum("quantity").alias("items_per_order"),
    sum("total_price").alias("order_value")
)

basket_stats = basket_analysis.agg(
    avg("items_per_order").alias("avg_items_per_order"),
    avg("order_value").alias("avg_order_value")
).collect()[0]

print(f"Average items per order: {basket_stats['avg_items_per_order']:.2f}")
print(f"Average order value: {basket_stats['avg_order_value']:.2f}")
Calculating average basket size...
                                                                                
Average items per order: 7.24
Average order value: 893.65

Part 7: Data Visualization¶

In [23]:
# Convert to Pandas for visualization
print("Preparing data for visualization...")
monthly_pd = monthly_stats.toPandas()
product_pd = product_stats.limit(10).toPandas()
category_pd = category_stats.toPandas()

print("Data converted to Pandas")
Preparing data for visualization...
                                                                                
Data converted to Pandas
In [24]:
# Monthly revenue trend
plt.figure(figsize=(12, 6))
monthly_pd['month_label'] = monthly_pd['order_year'].astype(str) + '-' + monthly_pd['order_month'].astype(str).str.zfill(2)
plt.plot(range(len(monthly_pd)), monthly_pd['total_revenue'], marker='o', linewidth=2)
plt.title('Monthly Revenue Trend')
plt.xlabel('Month')
plt.ylabel('Total Revenue')
plt.grid(True, alpha=0.3)
plt.xticks(range(len(monthly_pd)), monthly_pd['month_label'], rotation=45)
plt.tight_layout()
plt.show()

print("Monthly revenue plot created")
No description has been provided for this image
Monthly revenue plot created
In [25]:
# Top products by revenue
plt.figure(figsize=(12, 6))
plt.barh(product_pd['product_name'], product_pd['total_revenue'])
plt.title('Top 10 Products by Revenue')
plt.xlabel('Total Revenue')
plt.ylabel('Product')
plt.tight_layout()
plt.show()

print("Product revenue plot created")
No description has been provided for this image
Product revenue plot created
In [26]:
# Category revenue distribution
plt.figure(figsize=(10, 10))
plt.pie(category_pd['total_revenue'], labels=category_pd['category_name'], autopct='%1.1f%%', startangle=90)
plt.title('Category Revenue Distribution')
plt.axis('equal')
plt.tight_layout()
plt.show()

print("Category distribution plot created")
No description has been provided for this image
Category distribution plot created

Part 8: Data Export¶

In [27]:
# Export processed data
print("Exporting processed data...")

# Export customer statistics
customer_stats.write.mode("overwrite").parquet("output/customer_statistics")
print("Customer statistics exported")

# Export product statistics
product_stats.write.mode("overwrite").csv("output/product_statistics", header=True)
print("Product statistics exported")

# Export sales data
sales_df.write.mode("overwrite").parquet("output/sales_data")
print("Sales data exported")
Exporting processed data...
                                                                                
Customer statistics exported
                                                                                
Product statistics exported
                                                                                
Sales data exported

Part 9: Summary and Conclusions¶

In [28]:
# Generate summary report
print("=" * 50)
print("ASSIGNMENT 2 SUMMARY REPORT")
print("=" * 50)

total_customers = customers_df.count()
total_orders = orders_df.count()
total_revenue = sales_df.agg(sum("total_price")).collect()[0][0]

print(f"\nData Overview:")
print(f"Total Customers: {total_customers}")
print(f"Total Orders: {total_orders}")
print(f"Total Revenue: ${total_revenue:.2f}")

print(f"\nKey Metrics:")
print(f"Average orders per customer: {total_orders / total_customers:.2f}")
print(f"Average revenue per order: ${total_revenue / total_orders:.2f}")

top_customer = customer_stats.first()
print(f"\nTop customer: {top_customer['name']} (${top_customer['total_spent']:.2f})")

top_product = product_stats.first()
print(f"Top product: {top_product['product_name']} (${top_product['total_revenue']:.2f})")

print("\nProcessing completed successfully!")
print("=" * 50)
==================================================
ASSIGNMENT 2 SUMMARY REPORT
==================================================
                                                                                
Data Overview:
Total Customers: 24
Total Orders: 220
Total Revenue: $196602.88

Key Metrics:
Average orders per customer: 9.17
Average revenue per order: $893.65
                                                                                
Top customer: Customer 11 ($12231.63)
                                                                                
Top product: Product 51 ($10018.57)

Processing completed successfully!
==================================================
In [29]:
# Stop Spark session
spark.stop()
print("Spark session stopped")
Spark session stopped