Storage

Apache Spark


Apache Spark enables distributed analytical processing of large datasets stored in your analytics buckets. Use it for complex transformations, aggregations, and machine learning workflows.

Installation

First, ensure you have Spark installed. For Python-based workflows:

1
pip install pyspark

For detailed Spark setup instructions, see the Apache Spark documentation.

Basic setup

Here's a complete example showing how to configure Spark with your Supabase analytics bucket:

1
from pyspark.sql import SparkSession
2
3
# Configuration - Update with your Supabase credentials
4
PROJECT_REF = "your-project-ref"
5
WAREHOUSE = "your-analytics-bucket-name"
6
SERVICE_KEY = "your-service-key"
7
8
# S3 credentials from Project Settings > Storage
9
S3_ACCESS_KEY = "your-access-key"
10
S3_SECRET_KEY = "your-secret-key"
11
S3_REGION = "us-east-1"
12
13
# Construct Supabase endpoints
14
S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"
15
CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"
16
17
# Initialize Spark session with Iceberg configuration
18
spark = SparkSession.builder \
19
.master("local[*]") \
20
.appName("SupabaseIceberg") \
21
.config("spark.driver.host", "127.0.0.1") \
22
.config("spark.driver.bindAddress", "127.0.0.1") \
23
.config(
24
'spark.jars.packages',
25
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1'
26
) \
27
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
28
.config("spark.sql.catalog.supabase", "org.apache.iceberg.spark.SparkCatalog") \
29
.config("spark.sql.catalog.supabase.type", "rest") \
30
.config("spark.sql.catalog.supabase.uri", CATALOG_URI) \
31
.config("spark.sql.catalog.supabase.warehouse", WAREHOUSE) \
32
.config("spark.sql.catalog.supabase.token", SERVICE_KEY) \
33
.config("spark.sql.catalog.supabase.s3.endpoint", S3_ENDPOINT) \
34
.config("spark.sql.catalog.supabase.s3.path-style-access", "true") \
35
.config("spark.sql.catalog.supabase.s3.access-key-id", S3_ACCESS_KEY) \
36
.config("spark.sql.catalog.supabase.s3.secret-access-key", S3_SECRET_KEY) \
37
.config("spark.sql.catalog.supabase.s3.remote-signing-enabled", "false") \
38
.config("spark.sql.defaultCatalog", "supabase") \
39
.getOrCreate()
40
41
print("✓ Spark session initialized with Iceberg")

Creating tables

1
# Create a namespace for organization
2
spark.sql("CREATE NAMESPACE IF NOT EXISTS analytics")
3
4
# Create a new Iceberg table
5
spark.sql("""
6
CREATE TABLE IF NOT EXISTS analytics.events (
7
event_id BIGINT,
8
user_id BIGINT,
9
event_name STRING,
10
event_timestamp TIMESTAMP,
11
properties STRING
12
)
13
USING iceberg
14
""")
15
16
print("✓ Created table: analytics.events")

Writing data

1
# Insert data into the table
2
spark.sql("""
3
INSERT INTO analytics.events (event_id, user_id, event_name, event_timestamp, properties)
4
VALUES
5
(1, 101, 'login', TIMESTAMP '2024-01-15 10:30:00', '{"browser":"chrome"}'),
6
(2, 102, 'view_product', TIMESTAMP '2024-01-15 10:35:00', '{"product_id":"123"}'),
7
(3, 101, 'logout', TIMESTAMP '2024-01-15 10:40:00', '{}'),
8
(4, 103, 'purchase', TIMESTAMP '2024-01-15 10:45:00', '{"amount":99.99}')
9
""")
10
11
print("✓ Inserted 4 rows into analytics.events")

Reading data

1
# Read the entire table
2
result_df = spark.sql("SELECT * FROM analytics.events")
3
result_df.show(truncate=False)
4
5
# Apply filters
6
filtered_df = spark.sql("""
7
SELECT event_id, user_id, event_name
8
FROM analytics.events
9
WHERE event_name = 'login'
10
""")
11
filtered_df.show()
12
13
# Aggregations
14
summary_df = spark.sql("""
15
SELECT
16
event_name,
17
COUNT(*) as event_count,
18
COUNT(DISTINCT user_id) as unique_users
19
FROM analytics.events
20
GROUP BY event_name
21
ORDER BY event_count DESC
22
""")
23
summary_df.show()

Advanced operations

Working with dataframes

1
# Read as DataFrame
2
events_df = spark.read.format("iceberg").load("analytics.events")
3
4
# Apply Spark transformations
5
from pyspark.sql.functions import count, col, year, month
6
7
# Monthly event counts
8
monthly_events = events_df \
9
.withColumn("month", month(col("event_timestamp"))) \
10
.withColumn("year", year(col("event_timestamp"))) \
11
.groupBy("year", "month", "event_name") \
12
.agg(count("event_id").alias("count")) \
13
.orderBy("year", "month")
14
15
monthly_events.show()

Joining tables

1
# Create another table
2
spark.sql("""
3
CREATE TABLE IF NOT EXISTS analytics.users (
4
user_id BIGINT,
5
username STRING,
6
email STRING
7
)
8
USING iceberg
9
""")
10
11
spark.sql("""
12
INSERT INTO analytics.users VALUES
13
(101, 'alice', 'alice@example.com'),
14
(102, 'bob', 'bob@example.com'),
15
(103, 'charlie', 'charlie@example.com')
16
""")
17
18
# Join events with users
19
joined_df = spark.sql("""
20
SELECT
21
e.event_id,
22
e.event_name,
23
u.username,
24
u.email,
25
e.event_timestamp
26
FROM analytics.events e
27
JOIN analytics.users u ON e.user_id = u.user_id
28
ORDER BY e.event_timestamp
29
""")
30
31
joined_df.show(truncate=False)

Exporting results

1
# Export to Parquet
2
spark.sql("""
3
SELECT event_name, COUNT(*) as count
4
FROM analytics.events
5
GROUP BY event_name
6
""").write \
7
.mode("overwrite") \
8
.parquet("/tmp/event_summary.parquet")
9
10
# Export to CSV
11
spark.sql("""
12
SELECT *
13
FROM analytics.events
14
WHERE event_timestamp > TIMESTAMP '2024-01-15 10:30:00'
15
""").write \
16
.mode("overwrite") \
17
.option("header", "true") \
18
.csv("/tmp/recent_events.csv")
19
20
print("✓ Results exported")

Performance best practices

  • Partition tables - Partition large tables by date or region for faster queries
  • Select columns - Only select columns you need to reduce I/O
  • Use filters early - Apply WHERE clauses to reduce data processed
  • Cache frequently accessed tables - Use spark.catalog.cacheTable() for tables accessed multiple times
  • Cluster mode - Use cluster mode for production workloads instead of local mode

Complete example: Data processing pipeline

1
from pyspark.sql import SparkSession
2
from pyspark.sql.functions import col, year, month, count
3
4
# Setup (see Basic Setup section above)
5
spark = SparkSession.builder \
6
.master("local[*]") \
7
.appName("SupabaseAnalytics") \
8
.config("spark.sql.defaultCatalog", "supabase") \
9
# ... (add all config from Basic Setup)
10
.getOrCreate()
11
12
# Step 1: Read raw events
13
raw_events = spark.sql("SELECT * FROM analytics.events")
14
15
# Step 2: Transform and aggregate
16
monthly_summary = raw_events \
17
.withColumn("month", month(col("event_timestamp"))) \
18
.withColumn("year", year(col("event_timestamp"))) \
19
.groupBy("year", "month", "event_name") \
20
.agg(count("event_id").alias("total_events"))
21
22
# Step 3: Save results
23
monthly_summary.write \
24
.mode("overwrite") \
25
.option("path", "analytics.monthly_summary") \
26
.saveAsTable("analytics.monthly_summary")
27
28
print("✓ Pipeline completed")
29
monthly_summary.show()

Next steps