Storage

PyIceberg


PyIceberg is a Python client for Apache Iceberg that enables programmatic interaction with Iceberg tables. Use it to create, read, update, and delete data in your analytics buckets.

Installation

1
pip install pyiceberg pyarrow

Basic setup

Here's a complete example showing how to connect to your Supabase analytics bucket and perform operations:

1
from pyiceberg.catalog import load_catalog
2
import pyarrow as pa
3
import datetime
4
5
# Configuration - Update with your Supabase credentials
6
PROJECT_REF = "your-project-ref"
7
WAREHOUSE = "your-analytics-bucket-name"
8
SERVICE_KEY = "your-service-key"
9
10
# S3 credentials from Project Settings > Storage
11
S3_ACCESS_KEY = "your-access-key"
12
S3_SECRET_KEY = "your-secret-key"
13
S3_REGION = "us-east-1"
14
15
# Construct Supabase endpoints
16
S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"
17
CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"
18
19
# Load the Iceberg REST Catalog
20
catalog = load_catalog(
21
"supabase-analytics",
22
type="rest",
23
warehouse=WAREHOUSE,
24
uri=CATALOG_URI,
25
token=SERVICE_KEY,
26
**{
27
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
28
"s3.endpoint": S3_ENDPOINT,
29
"s3.access-key-id": S3_ACCESS_KEY,
30
"s3.secret-access-key": S3_SECRET_KEY,
31
"s3.region": S3_REGION,
32
"s3.force-virtual-addressing": False,
33
},
34
)
35
36
print("✓ Successfully connected to Iceberg catalog")

Creating tables

1
# Create a namespace for organization
2
catalog.create_namespace_if_not_exists("analytics")
3
4
# Define the schema for your Iceberg table
5
schema = pa.schema([
6
pa.field("event_id", pa.int64()),
7
pa.field("user_id", pa.int64()),
8
pa.field("event_name", pa.string()),
9
pa.field("event_timestamp", pa.timestamp("ms")),
10
pa.field("properties", pa.string()),
11
])
12
13
# Create the table
14
table = catalog.create_table_if_not_exists(
15
("analytics", "events"),
16
schema=schema
17
)
18
19
print("✓ Created table: analytics.events")

Writing data

1
import datetime
2
3
# Prepare your data
4
current_time = datetime.datetime.now()
5
data = pa.table({
6
"event_id": [1, 2, 3, 4, 5],
7
"user_id": [101, 102, 101, 103, 102],
8
"event_name": ["login", "view_product", "logout", "purchase", "login"],
9
"event_timestamp": [current_time] * 5,
10
"properties": [
11
'{"browser":"chrome"}',
12
'{"product_id":"123"}',
13
'{}',
14
'{"amount":99.99}',
15
'{"browser":"firefox"}'
16
],
17
})
18
19
# Append data to the table
20
table.append(data)
21
print("✓ Appended 5 rows to analytics.events")

Reading data

1
# Scan the entire table
2
scan_result = table.scan().to_pandas()
3
print(f"Total rows: {len(scan_result)}")
4
print(scan_result.head())
5
6
# Query with filters
7
filtered = table.scan(
8
filter="event_name = 'login'"
9
).to_pandas()
10
print(f"Login events: {len(filtered)}")
11
12
# Select specific columns
13
selected = table.scan(
14
selected_fields=["user_id", "event_name", "event_timestamp"]
15
).to_pandas()
16
print(selected.head())

Advanced operations

Listing tables and namespaces

1
# List all namespaces
2
namespaces = catalog.list_namespaces()
3
print("Namespaces:", namespaces)
4
5
# List tables in a namespace
6
tables = catalog.list_tables("analytics")
7
print("Tables in analytics:", tables)
8
9
# Get table metadata
10
table_metadata = catalog.load_table(("analytics", "events"))
11
print("Schema:", table_metadata.schema())
12
print("Partitions:", table_metadata.partitions())

Handling errors

1
try:
2
# Attempt to load a table
3
table = catalog.load_table(("analytics", "nonexistent"))
4
except Exception as e:
5
print(f"Error loading table: {e}")
6
7
# Check if table exists before creating
8
namespace = "analytics"
9
table_name = "events"
10
11
try:
12
existing_table = catalog.load_table((namespace, table_name))
13
print(f"Table already exists")
14
except Exception:
15
print(f"Table does not exist, creating...")
16
table = catalog.create_table((namespace, table_name), schema=schema)

Performance tips

  • Batch writes - Insert data in batches rather than row-by-row for better performance
  • Partition strategies - Use partitioning for large tables to improve query performance
  • Schema evolution - PyIceberg supports schema changes without rewriting data
  • Data format - Use Parquet for efficient columnar storage

Complete example: ETL pipeline

1
from pyiceberg.catalog import load_catalog
2
import pyarrow as pa
3
import pandas as pd
4
5
# Setup (see Basic Setup section above)
6
catalog = load_catalog(...)
7
8
# Step 1: Create analytics namespace
9
catalog.create_namespace_if_not_exists("warehouse")
10
11
# Step 2: Define table schema
12
schema = pa.schema([
13
pa.field("id", pa.int64()),
14
pa.field("name", pa.string()),
15
pa.field("created_at", pa.timestamp("ms")),
16
])
17
18
# Step 3: Create table
19
table = catalog.create_table_if_not_exists(
20
("warehouse", "products"),
21
schema=schema
22
)
23
24
# Step 4: Load data from CSV or database
25
df = pd.read_csv("products.csv")
26
data = pa.Table.from_pandas(df)
27
28
# Step 5: Write to analytics bucket
29
table.append(data)
30
print(f"✓ Loaded {len(data)} products to warehouse.products")
31
32
# Step 6: Verify
33
result = table.scan().to_pandas()
34
print(result.describe())

Next steps