Apache Spark
This feature is in alpha
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
Apache Spark enables distributed analytical processing of large datasets stored in your analytics buckets. Use it for complex transformations, aggregations, and machine learning workflows.
Installation
First, ensure you have Spark installed. For Python-based workflows:
1pip install pysparkFor detailed Spark setup instructions, see the Apache Spark documentation.
Basic setup
Here's a complete example showing how to configure Spark with your Supabase analytics bucket:
1from pyspark.sql import SparkSession23# Configuration - Update with your Supabase credentials4PROJECT_REF = "your-project-ref"5WAREHOUSE = "your-analytics-bucket-name"6SERVICE_KEY = "your-service-key"78# S3 credentials from Project Settings > Storage9S3_ACCESS_KEY = "your-access-key"10S3_SECRET_KEY = "your-secret-key"11S3_REGION = "us-east-1"1213# Construct Supabase endpoints14S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"15CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"1617# Initialize Spark session with Iceberg configuration18spark = SparkSession.builder \19 .master("local[*]") \20 .appName("SupabaseIceberg") \21 .config("spark.driver.host", "127.0.0.1") \22 .config("spark.driver.bindAddress", "127.0.0.1") \23 .config(24 'spark.jars.packages',25 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1'26 ) \27 .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \28 .config("spark.sql.catalog.supabase", "org.apache.iceberg.spark.SparkCatalog") \29 .config("spark.sql.catalog.supabase.type", "rest") \30 .config("spark.sql.catalog.supabase.uri", CATALOG_URI) \31 .config("spark.sql.catalog.supabase.warehouse", WAREHOUSE) \32 .config("spark.sql.catalog.supabase.token", SERVICE_KEY) \33 .config("spark.sql.catalog.supabase.s3.endpoint", S3_ENDPOINT) \34 .config("spark.sql.catalog.supabase.s3.path-style-access", "true") \35 .config("spark.sql.catalog.supabase.s3.access-key-id", S3_ACCESS_KEY) \36 .config("spark.sql.catalog.supabase.s3.secret-access-key", S3_SECRET_KEY) \37 .config("spark.sql.catalog.supabase.s3.remote-signing-enabled", "false") \38 .config("spark.sql.defaultCatalog", "supabase") \39 .getOrCreate()4041print("✓ Spark session initialized with Iceberg")Creating tables
1# Create a namespace for organization2spark.sql("CREATE NAMESPACE IF NOT EXISTS analytics")34# Create a new Iceberg table5spark.sql("""6 CREATE TABLE IF NOT EXISTS analytics.events (7 event_id BIGINT,8 user_id BIGINT,9 event_name STRING,10 event_timestamp TIMESTAMP,11 properties STRING12 )13 USING iceberg14""")1516print("✓ Created table: analytics.events")Writing data
1# Insert data into the table2spark.sql("""3 INSERT INTO analytics.events (event_id, user_id, event_name, event_timestamp, properties)4 VALUES5 (1, 101, 'login', TIMESTAMP '2024-01-15 10:30:00', '{"browser":"chrome"}'),6 (2, 102, 'view_product', TIMESTAMP '2024-01-15 10:35:00', '{"product_id":"123"}'),7 (3, 101, 'logout', TIMESTAMP '2024-01-15 10:40:00', '{}'),8 (4, 103, 'purchase', TIMESTAMP '2024-01-15 10:45:00', '{"amount":99.99}')9""")1011print("✓ Inserted 4 rows into analytics.events")Reading data
1# Read the entire table2result_df = spark.sql("SELECT * FROM analytics.events")3result_df.show(truncate=False)45# Apply filters6filtered_df = spark.sql("""7 SELECT event_id, user_id, event_name8 FROM analytics.events9 WHERE event_name = 'login'10""")11filtered_df.show()1213# Aggregations14summary_df = spark.sql("""15 SELECT16 event_name,17 COUNT(*) as event_count,18 COUNT(DISTINCT user_id) as unique_users19 FROM analytics.events20 GROUP BY event_name21 ORDER BY event_count DESC22""")23summary_df.show()Advanced operations
Working with dataframes
1# Read as DataFrame2events_df = spark.read.format("iceberg").load("analytics.events")34# Apply Spark transformations5from pyspark.sql.functions import count, col, year, month67# Monthly event counts8monthly_events = events_df \9 .withColumn("month", month(col("event_timestamp"))) \10 .withColumn("year", year(col("event_timestamp"))) \11 .groupBy("year", "month", "event_name") \12 .agg(count("event_id").alias("count")) \13 .orderBy("year", "month")1415monthly_events.show()Joining tables
1# Create another table2spark.sql("""3 CREATE TABLE IF NOT EXISTS analytics.users (4 user_id BIGINT,5 username STRING,6 email STRING7 )8 USING iceberg9""")1011spark.sql("""12 INSERT INTO analytics.users VALUES13 (101, 'alice', 'alice@example.com'),14 (102, 'bob', 'bob@example.com'),15 (103, 'charlie', 'charlie@example.com')16""")1718# Join events with users19joined_df = spark.sql("""20 SELECT21 e.event_id,22 e.event_name,23 u.username,24 u.email,25 e.event_timestamp26 FROM analytics.events e27 JOIN analytics.users u ON e.user_id = u.user_id28 ORDER BY e.event_timestamp29""")3031joined_df.show(truncate=False)Exporting results
1# Export to Parquet2spark.sql("""3 SELECT event_name, COUNT(*) as count4 FROM analytics.events5 GROUP BY event_name6""").write \7 .mode("overwrite") \8 .parquet("/tmp/event_summary.parquet")910# Export to CSV11spark.sql("""12 SELECT *13 FROM analytics.events14 WHERE event_timestamp > TIMESTAMP '2024-01-15 10:30:00'15""").write \16 .mode("overwrite") \17 .option("header", "true") \18 .csv("/tmp/recent_events.csv")1920print("✓ Results exported")Performance best practices
- Partition tables - Partition large tables by date or region for faster queries
- Select columns - Only select columns you need to reduce I/O
- Use filters early - Apply WHERE clauses to reduce data processed
- Cache frequently accessed tables - Use
spark.catalog.cacheTable()for tables accessed multiple times - Cluster mode - Use cluster mode for production workloads instead of local mode
Complete example: Data processing pipeline
1from pyspark.sql import SparkSession2from pyspark.sql.functions import col, year, month, count34# Setup (see Basic Setup section above)5spark = SparkSession.builder \6 .master("local[*]") \7 .appName("SupabaseAnalytics") \8 .config("spark.sql.defaultCatalog", "supabase") \9 # ... (add all config from Basic Setup)10 .getOrCreate()1112# Step 1: Read raw events13raw_events = spark.sql("SELECT * FROM analytics.events")1415# Step 2: Transform and aggregate16monthly_summary = raw_events \17 .withColumn("month", month(col("event_timestamp"))) \18 .withColumn("year", year(col("event_timestamp"))) \19 .groupBy("year", "month", "event_name") \20 .agg(count("event_id").alias("total_events"))2122# Step 3: Save results23monthly_summary.write \24 .mode("overwrite") \25 .option("path", "analytics.monthly_summary") \26 .saveAsTable("analytics.monthly_summary")2728print("✓ Pipeline completed")29monthly_summary.show()