Ingestion & Feed Providers
DataInfra ingests raw market data from external sources, converts it to Parquet, and writes it to the target engine. Feed providers are defined declaratively in YAML — field mappings, QFSQL transformations, quality tests, and partitioning rules are all configuration, not code.
How Ingestion Works
The ingestion pipeline follows a reader-centric architecture with two execution paths:
IngestPipeline.run()
│
├─► Load metadata (project config + feed provider YAMLs)
├─► Resolve credentials from local config
├─► Create reader (DatabentoReader or HTTPReader)
├─► reader.plan() → List[WorkUnit]
│
└─► Execute:
├─ DuckDB → LocalExecutor (sequential, single connection)
└─ Everything else → RayExecutor (parallel, bounded concurrency)
│
▼
engine.write_parquet() or engine.write_arrow()
│
▼
engine.commit_files() → Iceberg catalog
Each WorkUnit represents one atomic piece of work — a single file download, parse, and write. The reader's plan() method expands a feed provider config into a list of work units based on date range, symbols, and data types.
Readers
Two reader types handle all ingestion. The factory get_reader(provider_type) dispatches by type string.
DatabentoReader ("databento")
Downloads historical market data from the Databento API. Download-only — read() is not supported.
plan()fetches a manifest for each batch job, lists DBN files, and filters by date rangedownload()downloads a single DBN file, converts it to Parquet via the Databento SDK, fixes unsigned integer types for Trino compatibility, and adds atrading_datecolumn for Iceberg partition pruning- Raw DBN files are archived to the
raw/S3 prefix for provenance
Provider type: "databento". Used for equities market data (NASDAQ ITCH MBP-1).
HTTPReader ("http")
Handles REST API data sources. Supports two load modes:
- Download mode (
load_mode="download"): GETs a compressed file (Parquet.zst, CSV.gz), decompresses via theDecompressorprocessor, and returns the local Parquet path. Used for services like CryptoHFTData. - Read mode (
load_mode="read"): Fetches paginated JSON from REST APIs (Binance-style), handles rate limiting (429/418) with exponential backoff, pagination viafromId, and returns an in-memory Arrow table.
Provider type: "http". Used for crypto historical data (Binance, CryptoHFTData).
| Capability | DatabentoReader | HTTPReader |
|---|---|---|
| Mode | Download only | Download or read |
| Plan | Manifest-driven (job_id → DBN list) | Date/symbol expansion |
| Output | Parquet files | Parquet files or Arrow tables |
| Rate limiting | Built-in SDK retry | Exponential backoff |
| Raw archival | DBN → S3 raw/ prefix | Parquet → S3 warehouse |
Processors
The processor layer is minimal by design — reader-specific logic is preferred over a general-purpose pipeline.
| Processor | Role |
|---|---|
| Decompressor | Handles zstd, snappy, and gzip decompression. Strips compression extension to derive output path. Used by HTTPReader.download(). |
The BaseProcessor abstract class defines the processor interface (process(), validate(), cleanup()), but only Decompressor is currently implemented.
Work Units
A WorkUnit is the atomic unit of ingest work. The reader's plan() method produces a list of them from a feed provider configuration.
| Field | Type | Purpose |
|---|---|---|
source_ref | str | Display name (filename or date-hour string) |
provider_name | str | Feed provider YAML name |
data_type | str | CDM type: "cdm_trades", "cdm_lob_l1", etc. |
download_key | str | Provider-specific locator (URL path, DBN job_id/filename) |
symbols | List[str] | Symbols in this unit |
trading_date | Optional[date] | Trading date, if known |
extra | dict | Provider metadata for Ray serialization |
Executors
The executor is chosen automatically based on the target engine:
| Executor | Engine | Concurrency | Use Case |
|---|---|---|---|
| LocalExecutor | DuckDB | Sequential | Local dev, single connection |
| RayExecutor | OpenLakehouse, BigQuery, Snowflake, Databricks | Parallel (Ray, max 4) | Production, S3/Iceberg |
Both executors share the same interface: run(feed, units, table_name, mode, load_mode, reconcile). The RayExecutor submits each work unit as a Ray remote task with bounded concurrency and supports skip_if_exists by checking for existing data files before downloading.
Write Flow
The ingest module delegates all writes to the engine layer (quantflow.io). The engine used depends on the project config:
| Engine | Resolution |
|---|---|
| Ingest writer | project.data_processing.ingest_writer (default: s3_iceberg) |
| Historical engine | project.data_processing.historical_data_engine (default: trino) |
For download mode, the flow is:
Reader.download() → local Parquet file
→ engine.write_parquet(table, path, mode, trading_date, job_id) → S3
→ engine.upload_raw_file(local_dbn, archive_key) → S3 raw/
→ engine.write_manifest(...)
→ engine.commit_files(table, paths, mode, symbols, time_range) → Iceberg
→ engine.reconcile(table) (optional)
For read mode, the flow is:
Reader.read() → pa.Table (in-memory Arrow)
→ engine.write_arrow(table, data, mode)
→ engine.commit_files(...) → Iceberg
All writes are engine-agnostic — the same pipeline code works across DuckDB, OpenLakehouse, BigQuery, Snowflake, and Databricks.
Project Configuration
Feed providers are referenced in quantflow_project.yml under the ingest section:
ingest:
enabled: true
feeds:
- name: equities_databento
historical_data_provider: equities_databento_historical
streaming_data_provider: equities_ig_paper_streaming
symbols:
- QQQ
data_types:
- cdm_trades
- name: crypto_binance
historical_data_provider: cryptohftdata_historical
streaming_data_provider: binance_streaming
symbols:
- BTCUSDT
- ETHUSDT
data_types:
- cdm_trades
- cdm_lob_l1
| Field | Description |
|---|---|
name | Feed identifier within the project |
historical_data_provider | References a feed provider YAML in .definitions/feed_providers/ |
streaming_data_provider | Provider for real-time WebSocket feeds |
symbols | Instruments to ingest for this feed |
data_types | CDM types to download (e.g. cdm_trades, cdm_lob_l1) |
Feed Provider YAML
Each feed provider is a separate YAML file in .definitions/feed_providers/. Here is a complete example — a Databento historical MBP-1 feed for US equities:
name: "equities_databento_historical"
type: "databento"
auth: api_key
format: parquet
update_frequency: 1d
data_types:
cdm_trades:
name: mbp_1
stream: "mbp-1"
row_filter: "action = 'T'"
description: "Raw MBP-1 — dbt extracts trades via WHERE action = 'TRADE'"
schema:
attributes:
ts_event: { dtype: long }
rtype: { dtype: integer }
action: { dtype: string }
side: { dtype: string }
price: { dtype: double }
size: { dtype: integer }
symbol: { dtype: string }
ts_recv: { dtype: long }
field_mappings:
- target: event_time
source: ts_event
transformation: "from_unixtime(cast_safe(ts_event, double) / 1000000000.0) AT TIME ZONE 'UTC'"
is_time_filter_field: true
- target: price
source: price
transformation: "cast_safe(price, double)"
- target: size
source: size
transformation: "cast_safe(size, double)"
- target: venue
transformation: "'databento'"
- target: processed_time
transformation: "current_timestamp()"
| Field | Description |
|---|---|
name | Provider identifier — referenced by historical_data_provider in project config |
type | "databento" or "http" — determines which reader is used |
location | URL template with {exchange}, {date}, {hour}, {symbol}, {data_type} placeholders (HTTP only) |
auth | api_key or null |
format | parquet, csv, or null |
compression | zstd, snappy, gzip, or null |
update_frequency | 1h, 1d, daily, or hour:N — controls date/hour expansion in plan() |
Each entry under data_types defines one CDM entity type produced by this provider:
| Field | Description |
|---|---|
name | Short name (e.g. trades, mbp_1) |
stream | Dataset/stream name in the provider's system |
enabled | Set false to skip this data type |
row_filter | Optional SQL WHERE filter applied during dbt transformation |
schema.attributes | Raw source columns with types |
field_mappings | Raw → CDM column mappings with QFSQL transformations |
tests | dbt tests (uniqueness, recency, range checks) |
Field mappings define how raw source fields map to CDM columns:
| Field | Description |
|---|---|
target | CDM column name |
source | Raw field name (omitted for constants/computed columns) |
transformation | QFSQL expression applied to the source value |
is_time_filter_field | Marks the partition filter column for incremental loading |
Mappings without a source field inject constants (e.g. venue: "'databento'") or computed values (e.g. processed_time: "current_timestamp()").
→ dbt Generator — how field mappings become dbt staging models → Engine Layer — engine registry and QFSQL reference → Data Quality — how tests flow through the validation layers