Skip to main content

Ingestion & Feed Providers

DataInfra ingests raw market data from external sources, converts it to Parquet, and writes it to the target engine. Feed providers are defined declaratively in YAML — field mappings, QFSQL transformations, quality tests, and partitioning rules are all configuration, not code.


How Ingestion Works

The ingestion pipeline follows a reader-centric architecture with two execution paths:

IngestPipeline.run()

├─► Load metadata (project config + feed provider YAMLs)
├─► Resolve credentials from local config
├─► Create reader (DatabentoReader or HTTPReader)
├─► reader.plan() → List[WorkUnit]

└─► Execute:
├─ DuckDB → LocalExecutor (sequential, single connection)
└─ Everything else → RayExecutor (parallel, bounded concurrency)


engine.write_parquet() or engine.write_arrow()


engine.commit_files() → Iceberg catalog

Each WorkUnit represents one atomic piece of work — a single file download, parse, and write. The reader's plan() method expands a feed provider config into a list of work units based on date range, symbols, and data types.


Readers

Two reader types handle all ingestion. The factory get_reader(provider_type) dispatches by type string.

DatabentoReader ("databento")

Downloads historical market data from the Databento API. Download-onlyread() is not supported.

  • plan() fetches a manifest for each batch job, lists DBN files, and filters by date range
  • download() downloads a single DBN file, converts it to Parquet via the Databento SDK, fixes unsigned integer types for Trino compatibility, and adds a trading_date column for Iceberg partition pruning
  • Raw DBN files are archived to the raw/ S3 prefix for provenance

Provider type: "databento". Used for equities market data (NASDAQ ITCH MBP-1).

HTTPReader ("http")

Handles REST API data sources. Supports two load modes:

  • Download mode (load_mode="download"): GETs a compressed file (Parquet.zst, CSV.gz), decompresses via the Decompressor processor, and returns the local Parquet path. Used for services like CryptoHFTData.
  • Read mode (load_mode="read"): Fetches paginated JSON from REST APIs (Binance-style), handles rate limiting (429/418) with exponential backoff, pagination via fromId, and returns an in-memory Arrow table.

Provider type: "http". Used for crypto historical data (Binance, CryptoHFTData).

CapabilityDatabentoReaderHTTPReader
ModeDownload onlyDownload or read
PlanManifest-driven (job_id → DBN list)Date/symbol expansion
OutputParquet filesParquet files or Arrow tables
Rate limitingBuilt-in SDK retryExponential backoff
Raw archivalDBN → S3 raw/ prefixParquet → S3 warehouse

Processors

The processor layer is minimal by design — reader-specific logic is preferred over a general-purpose pipeline.

ProcessorRole
DecompressorHandles zstd, snappy, and gzip decompression. Strips compression extension to derive output path. Used by HTTPReader.download().

The BaseProcessor abstract class defines the processor interface (process(), validate(), cleanup()), but only Decompressor is currently implemented.


Work Units

A WorkUnit is the atomic unit of ingest work. The reader's plan() method produces a list of them from a feed provider configuration.

FieldTypePurpose
source_refstrDisplay name (filename or date-hour string)
provider_namestrFeed provider YAML name
data_typestrCDM type: "cdm_trades", "cdm_lob_l1", etc.
download_keystrProvider-specific locator (URL path, DBN job_id/filename)
symbolsList[str]Symbols in this unit
trading_dateOptional[date]Trading date, if known
extradictProvider metadata for Ray serialization

Executors

The executor is chosen automatically based on the target engine:

ExecutorEngineConcurrencyUse Case
LocalExecutorDuckDBSequentialLocal dev, single connection
RayExecutorOpenLakehouse, BigQuery, Snowflake, DatabricksParallel (Ray, max 4)Production, S3/Iceberg

Both executors share the same interface: run(feed, units, table_name, mode, load_mode, reconcile). The RayExecutor submits each work unit as a Ray remote task with bounded concurrency and supports skip_if_exists by checking for existing data files before downloading.


Write Flow

The ingest module delegates all writes to the engine layer (quantflow.io). The engine used depends on the project config:

EngineResolution
Ingest writerproject.data_processing.ingest_writer (default: s3_iceberg)
Historical engineproject.data_processing.historical_data_engine (default: trino)

For download mode, the flow is:

Reader.download() → local Parquet file
→ engine.write_parquet(table, path, mode, trading_date, job_id) → S3
→ engine.upload_raw_file(local_dbn, archive_key) → S3 raw/
→ engine.write_manifest(...)
→ engine.commit_files(table, paths, mode, symbols, time_range) → Iceberg
→ engine.reconcile(table) (optional)

For read mode, the flow is:

Reader.read() → pa.Table (in-memory Arrow)
→ engine.write_arrow(table, data, mode)
→ engine.commit_files(...) → Iceberg

All writes are engine-agnostic — the same pipeline code works across DuckDB, OpenLakehouse, BigQuery, Snowflake, and Databricks.


Project Configuration

Feed providers are referenced in quantflow_project.yml under the ingest section:

ingest:
enabled: true
feeds:
- name: equities_databento
historical_data_provider: equities_databento_historical
streaming_data_provider: equities_ig_paper_streaming
symbols:
- QQQ
data_types:
- cdm_trades
- name: crypto_binance
historical_data_provider: cryptohftdata_historical
streaming_data_provider: binance_streaming
symbols:
- BTCUSDT
- ETHUSDT
data_types:
- cdm_trades
- cdm_lob_l1
FieldDescription
nameFeed identifier within the project
historical_data_providerReferences a feed provider YAML in .definitions/feed_providers/
streaming_data_providerProvider for real-time WebSocket feeds
symbolsInstruments to ingest for this feed
data_typesCDM types to download (e.g. cdm_trades, cdm_lob_l1)

Feed Provider YAML

Each feed provider is a separate YAML file in .definitions/feed_providers/. Here is a complete example — a Databento historical MBP-1 feed for US equities:

name: "equities_databento_historical"
type: "databento"
auth: api_key
format: parquet
update_frequency: 1d

data_types:
cdm_trades:
name: mbp_1
stream: "mbp-1"
row_filter: "action = 'T'"
description: "Raw MBP-1 — dbt extracts trades via WHERE action = 'TRADE'"

schema:
attributes:
ts_event: { dtype: long }
rtype: { dtype: integer }
action: { dtype: string }
side: { dtype: string }
price: { dtype: double }
size: { dtype: integer }
symbol: { dtype: string }
ts_recv: { dtype: long }

field_mappings:
- target: event_time
source: ts_event
transformation: "from_unixtime(cast_safe(ts_event, double) / 1000000000.0) AT TIME ZONE 'UTC'"
is_time_filter_field: true
- target: price
source: price
transformation: "cast_safe(price, double)"
- target: size
source: size
transformation: "cast_safe(size, double)"
- target: venue
transformation: "'databento'"
- target: processed_time
transformation: "current_timestamp()"
FieldDescription
nameProvider identifier — referenced by historical_data_provider in project config
type"databento" or "http" — determines which reader is used
locationURL template with {exchange}, {date}, {hour}, {symbol}, {data_type} placeholders (HTTP only)
authapi_key or null
formatparquet, csv, or null
compressionzstd, snappy, gzip, or null
update_frequency1h, 1d, daily, or hour:N — controls date/hour expansion in plan()

Each entry under data_types defines one CDM entity type produced by this provider:

FieldDescription
nameShort name (e.g. trades, mbp_1)
streamDataset/stream name in the provider's system
enabledSet false to skip this data type
row_filterOptional SQL WHERE filter applied during dbt transformation
schema.attributesRaw source columns with types
field_mappingsRaw → CDM column mappings with QFSQL transformations
testsdbt tests (uniqueness, recency, range checks)

Field mappings define how raw source fields map to CDM columns:

FieldDescription
targetCDM column name
sourceRaw field name (omitted for constants/computed columns)
transformationQFSQL expression applied to the source value
is_time_filter_fieldMarks the partition filter column for incremental loading

Mappings without a source field inject constants (e.g. venue: "'databento'") or computed values (e.g. processed_time: "current_timestamp()").

dbt Generator — how field mappings become dbt staging models → Engine Layer — engine registry and QFSQL reference → Data Quality — how tests flow through the validation layers