PyIceberg
This feature is in alpha
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
PyIceberg is a Python client for Apache Iceberg that enables programmatic interaction with Iceberg tables. Use it to create, read, update, and delete data in your analytics buckets.
Installation
1pip install pyiceberg pyarrowBasic setup
Here's a complete example showing how to connect to your Supabase analytics bucket and perform operations:
1from pyiceberg.catalog import load_catalog2import pyarrow as pa3import datetime45# Configuration - Update with your Supabase credentials6PROJECT_REF = "your-project-ref"7WAREHOUSE = "your-analytics-bucket-name"8SERVICE_KEY = "your-service-key"910# S3 credentials from Project Settings > Storage11S3_ACCESS_KEY = "your-access-key"12S3_SECRET_KEY = "your-secret-key"13S3_REGION = "us-east-1"1415# Construct Supabase endpoints16S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"17CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"1819# Load the Iceberg REST Catalog20catalog = load_catalog(21 "supabase-analytics",22 type="rest",23 warehouse=WAREHOUSE,24 uri=CATALOG_URI,25 token=SERVICE_KEY,26 **{27 "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",28 "s3.endpoint": S3_ENDPOINT,29 "s3.access-key-id": S3_ACCESS_KEY,30 "s3.secret-access-key": S3_SECRET_KEY,31 "s3.region": S3_REGION,32 "s3.force-virtual-addressing": False,33 },34)3536print("✓ Successfully connected to Iceberg catalog")Creating tables
1# Create a namespace for organization2catalog.create_namespace_if_not_exists("analytics")34# Define the schema for your Iceberg table5schema = pa.schema([6 pa.field("event_id", pa.int64()),7 pa.field("user_id", pa.int64()),8 pa.field("event_name", pa.string()),9 pa.field("event_timestamp", pa.timestamp("ms")),10 pa.field("properties", pa.string()),11])1213# Create the table14table = catalog.create_table_if_not_exists(15 ("analytics", "events"),16 schema=schema17)1819print("✓ Created table: analytics.events")Writing data
1import datetime23# Prepare your data4current_time = datetime.datetime.now()5data = pa.table({6 "event_id": [1, 2, 3, 4, 5],7 "user_id": [101, 102, 101, 103, 102],8 "event_name": ["login", "view_product", "logout", "purchase", "login"],9 "event_timestamp": [current_time] * 5,10 "properties": [11 '{"browser":"chrome"}',12 '{"product_id":"123"}',13 '{}',14 '{"amount":99.99}',15 '{"browser":"firefox"}'16 ],17})1819# Append data to the table20table.append(data)21print("✓ Appended 5 rows to analytics.events")Reading data
1# Scan the entire table2scan_result = table.scan().to_pandas()3print(f"Total rows: {len(scan_result)}")4print(scan_result.head())56# Query with filters7filtered = table.scan(8 filter="event_name = 'login'"9).to_pandas()10print(f"Login events: {len(filtered)}")1112# Select specific columns13selected = table.scan(14 selected_fields=["user_id", "event_name", "event_timestamp"]15).to_pandas()16print(selected.head())Advanced operations
Listing tables and namespaces
1# List all namespaces2namespaces = catalog.list_namespaces()3print("Namespaces:", namespaces)45# List tables in a namespace6tables = catalog.list_tables("analytics")7print("Tables in analytics:", tables)89# Get table metadata10table_metadata = catalog.load_table(("analytics", "events"))11print("Schema:", table_metadata.schema())12print("Partitions:", table_metadata.partitions())Handling errors
1try:2 # Attempt to load a table3 table = catalog.load_table(("analytics", "nonexistent"))4except Exception as e:5 print(f"Error loading table: {e}")67# Check if table exists before creating8namespace = "analytics"9table_name = "events"1011try:12 existing_table = catalog.load_table((namespace, table_name))13 print(f"Table already exists")14except Exception:15 print(f"Table does not exist, creating...")16 table = catalog.create_table((namespace, table_name), schema=schema)Performance tips
- Batch writes - Insert data in batches rather than row-by-row for better performance
- Partition strategies - Use partitioning for large tables to improve query performance
- Schema evolution - PyIceberg supports schema changes without rewriting data
- Data format - Use Parquet for efficient columnar storage
Complete example: ETL pipeline
1from pyiceberg.catalog import load_catalog2import pyarrow as pa3import pandas as pd45# Setup (see Basic Setup section above)6catalog = load_catalog(...)78# Step 1: Create analytics namespace9catalog.create_namespace_if_not_exists("warehouse")1011# Step 2: Define table schema12schema = pa.schema([13 pa.field("id", pa.int64()),14 pa.field("name", pa.string()),15 pa.field("created_at", pa.timestamp("ms")),16])1718# Step 3: Create table19table = catalog.create_table_if_not_exists(20 ("warehouse", "products"),21 schema=schema22)2324# Step 4: Load data from CSV or database25df = pd.read_csv("products.csv")26data = pa.Table.from_pandas(df)2728# Step 5: Write to analytics bucket29table.append(data)30print(f"✓ Loaded {len(data)} products to warehouse.products")3132# Step 6: Verify33result = table.scan().to_pandas()34print(result.describe())