Storage

Apache Spark


Apache Spark enables distributed analytical processing of large datasets stored in your analytics buckets. Use it for complex transformations, aggregations, and machine learning workflows.

Installation#

First, ensure you have Spark installed. For Python-based workflows:

1
pip install pyspark

For detailed Spark setup instructions, see the Apache Spark documentation.

Basic setup#

Here's a complete example showing how to configure Spark with your Supabase analytics bucket:

1
from pyspark.sql import SparkSession
2
3
# Configuration - Update with your Supabase credentials
4
PROJECT_REF = "your-project-ref"
5
WAREHOUSE = "your-analytics-bucket-name"
6
SERVICE_KEY = "your-service-key"
7
8
# S3 credentials from Project Settings > Storage
9
S3_ACCESS_KEY = "your-access-key"
10
S3_SECRET_KEY = "your-secret-key"
11
S3_REGION = "us-east-1"
12
13
# Construct Supabase endpoints
14
S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"
15
CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"
16
17
# Initialize Spark session with Iceberg configuration
18
spark = SparkSession.builder \
19
.master("local[*]") \
20
.appName("SupabaseIceberg") \
21
.config("spark.driver.host", "127.0.0.1") \
22
.config("spark.driver.bindAddress", "127.0.0.1") \
23
.config(
24
'spark.jars.packages',
25
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1'
26
) \
27
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
28
.config("spark.sql.catalog.supabase", "org.apache.iceberg.spark.SparkCatalog") \
29
.config("spark.sql.catalog.supabase.type", "rest") \
30
.config("spark.sql.catalog.supabase.uri", CATALOG_URI) \
31
.config("spark.sql.catalog.supabase.warehouse", WAREHOUSE) \
32
.config("spark.sql.catalog.supabase.token", SERVICE_KEY) \
33
.config("spark.sql.catalog.supabase.s3.endpoint", S3_ENDPOINT) \
34
.config("spark.sql.catalog.supabase.s3.path-style-access", "true") \
35
.config("spark.sql.catalog.supabase.s3.access-key-id", S3_ACCESS_KEY) \
36
.config("spark.sql.catalog.supabase.s3.secret-access-key", S3_SECRET_KEY) \
37
.config("spark.sql.catalog.supabase.s3.remote-signing-enabled", "false") \
38
.config("spark.sql.defaultCatalog", "supabase") \
39
.getOrCreate()
40
41
print("✓ Spark session initialized with Iceberg")

Creating tables#

1
# Create a namespace for organization
2
spark.sql("CREATE NAMESPACE IF NOT EXISTS analytics")
3
4
# Create a new Iceberg table
5
spark.sql("""
6
CREATE TABLE IF NOT EXISTS analytics.events (
7
event_id BIGINT,
8
user_id BIGINT,
9
event_name STRING,
10
event_timestamp TIMESTAMP,
11
properties STRING
12
)
13
USING iceberg
14
""")
15
16
print("✓ Created table: analytics.events")

Writing data#

1
# Insert data into the table
2
spark.sql("""
3
INSERT INTO analytics.events (event_id, user_id, event_name, event_timestamp, properties)
4
VALUES
5
(1, 101, 'login', TIMESTAMP '2024-01-15 10:30:00', '{"browser":"chrome"}'),
6
(2, 102, 'view_product', TIMESTAMP '2024-01-15 10:35:00', '{"product_id":"123"}'),
7
(3, 101, 'logout', TIMESTAMP '2024-01-15 10:40:00', '{}'),
8
(4, 103, 'purchase', TIMESTAMP '2024-01-15 10:45:00', '{"amount":99.99}')
9
""")
10
11
print("✓ Inserted 4 rows into analytics.events")

Reading data#

1
# Read the entire table
2
result_df = spark.sql("SELECT * FROM analytics.events")
3
result_df.show(truncate=False)
4
5
# Apply filters
6
filtered_df = spark.sql("""
7
SELECT event_id, user_id, event_name
8
FROM analytics.events
9
WHERE event_name = 'login'
10
""")
11
filtered_df.show()
12
13
# Aggregations
14
summary_df = spark.sql("""
15
SELECT
16
event_name,
17
COUNT(*) as event_count,
18
COUNT(DISTINCT user_id) as unique_users
19
FROM analytics.events
20
GROUP BY event_name
21
ORDER BY event_count DESC
22
""")
23
summary_df.show()

Advanced operations#

Working with dataframes#

1
# Read as DataFrame
2
events_df = spark.read.format("iceberg").load("analytics.events")
3
4
# Apply Spark transformations
5
from pyspark.sql.functions import count, col, year, month
6
7
# Monthly event counts
8
monthly_events = events_df \
9
.withColumn("month", month(col("event_timestamp"))) \
10
.withColumn("year", year(col("event_timestamp"))) \
11
.groupBy("year", "month", "event_name") \
12
.agg(count("event_id").alias("count")) \
13
.orderBy("year", "month")
14
15
monthly_events.show()

Joining tables#

1
# Create another table
2
spark.sql("""
3
CREATE TABLE IF NOT EXISTS analytics.users (
4
user_id BIGINT,
5
username STRING,
6
email STRING
7
)
8
USING iceberg
9
""")
10
11
spark.sql("""
12
INSERT INTO analytics.users VALUES
13
(101, 'alice', 'alice@example.com'),
14
(102, 'bob', 'bob@example.com'),
15
(103, 'charlie', 'charlie@example.com')
16
""")
17
18
# Join events with users
19
joined_df = spark.sql("""
20
SELECT
21
e.event_id,
22
e.event_name,
23
u.username,
24
u.email,
25
e.event_timestamp
26
FROM analytics.events e
27
JOIN analytics.users u ON e.user_id = u.user_id
28
ORDER BY e.event_timestamp
29
""")
30
31
joined_df.show(truncate=False)

Exporting results#

1
# Export to Parquet
2
spark.sql("""
3
SELECT event_name, COUNT(*) as count
4
FROM analytics.events
5
GROUP BY event_name
6
""").write \
7
.mode("overwrite") \
8
.parquet("/tmp/event_summary.parquet")
9
10
# Export to CSV
11
spark.sql("""
12
SELECT *
13
FROM analytics.events
14
WHERE event_timestamp > TIMESTAMP '2024-01-15 10:30:00'
15
""").write \
16
.mode("overwrite") \
17
.option("header", "true") \
18
.csv("/tmp/recent_events.csv")
19
20
print("✓ Results exported")

Performance best practices#

  • Partition tables - Partition large tables by date or region for faster queries
  • Select columns - Only select columns you need to reduce I/O
  • Use filters early - Apply WHERE clauses to reduce data processed
  • Cache frequently accessed tables - Use spark.catalog.cacheTable() for tables accessed multiple times
  • Cluster mode - Use cluster mode for production workloads instead of local mode

Complete example: Data processing pipeline#

1
from pyspark.sql import SparkSession
2
from pyspark.sql.functions import col, year, month, count
3
4
# Setup (see Basic Setup section above)
5
spark = SparkSession.builder \
6
.master("local[*]") \
7
.appName("SupabaseAnalytics") \
8
.config("spark.sql.defaultCatalog", "supabase") \
9
# ... (add all config from Basic Setup)
10
.getOrCreate()
11
12
# Step 1: Read raw events
13
raw_events = spark.sql("SELECT * FROM analytics.events")
14
15
# Step 2: Transform and aggregate
16
monthly_summary = raw_events \
17
.withColumn("month", month(col("event_timestamp"))) \
18
.withColumn("year", year(col("event_timestamp"))) \
19
.groupBy("year", "month", "event_name") \
20
.agg(count("event_id").alias("total_events"))
21
22
# Step 3: Save results
23
monthly_summary.write \
24
.mode("overwrite") \
25
.option("path", "analytics.monthly_summary") \
26
.saveAsTable("analytics.monthly_summary")
27
28
print("✓ Pipeline completed")
29
monthly_summary.show()

Next steps#