Skip to main content

OpenLakehouse

The OpenLakehouse engine is QuantFlow's production data backend. It unifies four open-format components — S3-compatible object storage, Apache Parquet, Apache Iceberg, and Trino — into a single engine that handles the full data lifecycle: ingest writes, Iceberg commits, federated SQL queries, and operational maintenance.

All three registry names resolve to the same engine:

from quantflow.io import DBRegistry

engine = DBRegistry.create("openlakehouse", config) # canonical name
engine = DBRegistry.create("s3_iceberg", config) # backward compat alias
engine = DBRegistry.create("trino", config) # backward compat alias

Why OpenLakehouse

Market data at HFT scale produces billions of events per day. A traditional data warehouse would be expensive to store and slow to query. A files-only approach (raw Parquet on S3) lacks ACID guarantees, schema management, and concurrent write safety. The OpenLakehouse bridges this gap:

  • Storage is cheap — S3-compatible object storage costs a fraction of warehouse storage
  • Table format provides guarantees — Iceberg brings ACID transactions, time-travel, and schema evolution to files on S3
  • Compute is separate — Trino provides SQL access without locking you into a proprietary query engine
  • Open formats, no lock-in — Parquet and Iceberg are open standards; you can read the data with any compatible tool

Components

ComponentRole
S3 / Object StorePersistent storage layer. Backblaze B2, AWS S3, or any S3-compatible endpoint. Cost-effective at HFT scale.
Apache ParquetColumnar file format. Compressed, supports predicate pushdown and partition-aware scans. The on-disk format for all QuantFlow data.
Apache IcebergTable format layer. Provides ACID transactions, time-travel queries, schema evolution, partition evolution, and hidden partitioning — all on top of Parquet files.
TrinoFederated SQL query engine. Joins across Iceberg catalogs, external databases, and file-based sources. Provides the SQL interface for both ad-hoc queries and pipeline reads.
Ingest LedgerSQLite-based audit trail. Tracks every file ingested — source, timestamp, row count — for replay, reconciliation, and debugging.

Write Path

Ray workers (parallel)

├─► write Parquet to staging/ area (no write contention — each shard is independent)


engine.commit_files()

├─► register new Parquet files in Iceberg catalog
├─► add partition entries
├─► update table snapshot


Trino queries the committed table

├─► resolves current Iceberg snapshot
├─► applies partition pruning (reads only relevant files)
└─► scans Parquet with predicate pushdown

Raw provider files (DBN, CSV) are archived to raw/ for provenance. Processed Parquet files live under warehouse/{catalog}/ as Iceberg-managed tables.


Storage Folder Structure

How the OpenLakehouse engine organizes data on S3:

s3://{bucket}/
├── raw/ # Original source files (archive)
├── staging/ # Ray temporary Parquet before Iceberg commit
├── warehouse/ # Iceberg managed tables (metadata + data)
│ └── {catalog}/
│ ├── raw/ # Raw ingested tables
│ ├── cdm/ # dbt-transformed CDM tables
│ └── monitoring/ # Ingest run logs, manifests
└── ops/ # Manifests, reconciliation, audit (planned)

Raw Archive (raw/)

Original files downloaded from data providers, preserved byte-for-byte.

raw/
└── {provider_type}/ # "databento", "binance"
└── {venue}.{dataset}/ # "xnas.itch", "futures_um"
└── {schema}/ # "mbp1", "trades", "aggTrades"
└── request_id={job_id}/ # Hive partition: Databento batch job ID
└── date={trading_date}/ # Hive partition: YYYY-MM-DD
├── original/ # Original .dbn.zst / .csv.gz files
└── manifest/ # Download manifests (future)

Example — Databento MBP-1:

raw/databento/xnas.itch/mbp1/
request_id=XNAS-20260520-RJV8PYSHC4/
date=2026-02-17/original/xnas-itch-20260217.mbp-1.dbn.zst
date=2026-02-18/original/xnas-itch-20260218.mbp-1.dbn.zst

Staging (staging/)

Parquet files written by Ray workers before Iceberg commit. Each shard writes independently — no write contention.

staging/ray/{table_short}/
batch_id={batch_id}/
event_date={date}/
symbol={symbol}/
part-{i}-{uuid}.parquet

Example:

staging/ray/cdm_trades/
batch_id=databento_xnas_itch_trades_20260501_001/
event_date=2026-05-01/
symbol=TSLA/
part-worker-00001-a8f3c4d2.parquet

Warehouse (warehouse/)

Iceberg-managed tables. Each table has metadata/ (Iceberg metadata files, manifest lists, manifests) and data/ (Parquet data files).

Raw tables:

warehouse/{catalog}/raw/
{provider}_{stream}/
metadata/ # Iceberg metadata
data/
{trading_date}/ # YYYY-MM-DD
{job_id}/ # Batch job / request ID
data.parquet

CDM tables (dbt + State Engine output):

warehouse/{catalog}/cdm/
cdm_trades/ # Union of all raw trades
cdm_trade_enriched/ # State engine output
cdm_lob_l2/ # LOB snapshots
cdm_time_bars/ # Time bars
cdm_tick_bars/ # Tick bars
cdm_dollar_bars/ # Dollar bars
... # Other bar types
ft_features/ # Feature engine output
labels/ # Label engine output

Each CDM table follows the standard Iceberg layout. Partitioning is determined by the CDM entity YAML:

Entity GroupPartition ColumnGranularity
Event tables (trades, lob)event_timehour
Bar tablesstart_timeday
Feature/label tablestimestampday

Monitoring tables:

warehouse/{catalog}/monitoring/
ingest_runs/ # PipelineLogger: run start/summary events
ingestion_manifest/ # Per-file download records

Path Resolution

_table_path(table) → warehouse location. Takes a fully qualified table name like raw.equities_databento_historical_mbp-1 and resolves to:

{bucket}/warehouse/{catalog}/{namespace}/{name}/

Where {namespace} is the part before the first . (e.g. raw), and {name} is the part after.

expected_data_path(table, trading_date, job_id) → expected write location. Used by skip_if_exists:

s3://{bucket}/warehouse/{catalog}/{namespace}/{name}/data/{trading_date}/{job_id}

Write Flow Summary

Databento API

▼ download .dbn.zst
Raw DBN (temp)
├──► convert to Parquet ──► S3 warehouse/data/{date}/{job_id}/data.parquet
└──► copy raw .dbn.zst ──► S3 raw/{provider}/{venue}.{dataset}/{schema}/
request_id={job_id}/date={date}/original/


engine.write_parquet()


engine.commit_files()


Iceberg catalog (JDBC)

Cleanup and Maintenance

  • Expire snapshots: ALTER TABLE {table} EXECUTE expire_snapshots(retention_threshold => '7d') (Trino)
  • Remove orphans: ALTER TABLE {table} EXECUTE remove_orphan_files (Trino)
  • Abort stuck uploads: boto3.list_multipart_uploads() + boto3.abort_multipart_upload() (Backblaze B2)
  • Hard delete with versioning: List all versions via boto3.list_object_versions(), delete each with VersionId