Skip to main content

Running Pipelines

Batch mode (research), streaming mode (live trading), Dagster orchestration, incremental updates, and CLI reference.


1. Batch Mode (Research)

Processes historical data in DuckDB. Three stages run sequentially (or individually):

# Run everything: ingest → state engine → features + labels
qf run --start-date 2026-01-01 --end-date 2026-01-31

# Run a specific stage
qf run --engine ingest # Download raw data
qf run --engine dbt # Run dbt: raw → CDM schema
qf run --engine state # State engine: raw → CDM tables
qf run --engine label # Label computation
qf run --engine feature # Feature computation

# Filter by symbol
qf run --symbols BTCUSDT --start-date 2026-01-01 --end-date 2026-01-31

Stage Details

StageFlagWhat happens
Ingest--engine ingestDownloads raw trades + LOB from historical feed providers. Caches locally.
dbt--engine dbtRuns dbt transformations: raw → CDM schema (staging + CDM models).
State--engine stateState Engine: Numba fused kernel produces enriched trades, bars, snapshots. Writes to CDM tables.
Label--engine labelLabel Engine computes all labels. Writes to cdm.cdm_labels.
Feature--engine featureFeature Engine computes all features. Writes to database.

Output Tables (DuckDB, schema cdm)

TableContents
cdm_trade_enrichedTrades with L1 enrichment (mid, spread, direction, micro-price)
cdm_lob_l2Full-depth LOB snapshots
cdm_quotesBest bid/ask quotes per event
cdm_{type}_barsPer-type bar tables: cdm_tick_bars, cdm_volume_bars, cdm_dollar_bars, cdm_time_bars, cdm_imbalance_bars, cdm_run_bars, cdm_volatility_bars, cdm_dollar_imbalance_bars, cdm_cusum_bars
cdm_labelsLabel values in row format

2. Batch via Dagster

Dagster provides asset lineage, run history, per-stage retries, and a web UI for the batch pipeline. Streaming runs independently in DolphinDB — not through Dagster.

Start Dagster

dagster dev -w dagster_workspace.yaml

Opens the Dagit UI at http://localhost:3000. From the UI you can:

  • View the 5-stage asset lineage graph
  • Trigger individual jobs or the full pipeline
  • Inspect run history, logs, and asset materializations
  • Re-run failed stages independently

Asset Stages

ingest → dbt → state_engine → label_engine → feature_engine
StageAssetsDescription
ingestraw_{source} per feed providerDownload raw data, write to {project}_raw.*
dbtdbt_transformRun dbt to transform raw tables into CDM schema
state_enginecdm_trade_enriched, cdm_lob_l2, cdm_{type}_barsCompute enriched trades, LOB snapshots, bar types
label_enginecdm_labelsCompute supervised labels from bars
feature_engine{project}_feature.featuresCompute feature sets

Available Jobs

JobSelectionUse Case
quantflow_pipelineAll assetsFull pipeline run
quantflow_ingestIngest groupRe-download raw data
quantflow_dbtdbt groupRe-run dbt transformations
quantflow_state_engineState engine groupRecompute bars and snapshots
quantflow_label_engineLabel engine groupRecompute labels
quantflow_feature_engineFeature engine groupRecompute features

Programmatic Execution

from quantflow.metadata import get_metadata_manager
from quantflow.pipeline import create_runner

mgr = get_metadata_manager(project_root=".", strict=False)
mgr.load_all()

runner = create_runner(mgr)
result = runner.run(stage="all") # or ingest, dbt, state_engine, label_engine, feature_engine

print(f"Status: {result['status']}")
print(f"Run ID: {result['dagster_run_id']}")
print(f"Elapsed: {result['elapsed']}")

3. Streaming Mode (Live Trading)

Deploys the full pipeline into a DolphinDB cluster. Runs continuously until stopped:

# Deploy everything
qf run --mode trade

# Or deploy individual stages
qf pipeline run --stage ingest # Start data ingestion
qf pipeline run --stage process # Start state engine
qf pipeline run --stage feature # Start feature engine

# Monitor
qf pipeline status

# Stop
qf pipeline stop

Four stages run inside DolphinDB:

StageWhat runs
IngestWebSocket data feeds into DolphinDB stream tables
ProcessBridge engines + state engines: raw → CDM bars, snapshots, enriched trades
FeatureFeature engines: consolidated deployment computes all features from stream tables

All computation is inside DolphinDB. Python handles only deployment and monitoring — it can disconnect after deployment.


4. Write Modes

Three write modes are available across all pipeline stages:

ModeBehaviorUse when
delete_insert (default)Delete rows matching the unique key, then insert new rowsIdempotent re-runs — same input produces same output regardless of run count
appendInsert new rows without touching existing dataAccumulating data over time, preserving history across runs
overwriteDrop and recreate the table, then insertFull refresh — schema changed or starting clean

Defaults per stage:

StageDefault ModeUnique KeyNotes
Ingestappend (CLI) / delete_insert (API)From data type schemaCLI preserves existing raw data by default
State Enginedelete_insertFrom CDM entity definitionRe-running a date range replaces matching rows
Feature Enginedelete_insertsymbol, venue, feature_name, bar_clock, feature_time (tick_to_bar) or venue, symbol, event_time (bar)Re-running replaces matching rows
Label Enginedelete_insertsymbol, timestamp, label_nameAlso writes a run_id column for run tracking

To reprocess after changing thresholds:

qf run --engine state --start-date 2026-01-01 --end-date 2026-01-31
qf run --engine feature --start-date 2026-01-01 --end-date 2026-01-31

5. CLI Reference

qf init

qf init <project_name> [--template crypto|equity|forex] [--output /path] [--force]

Creates a new project from a template. --force overwrites an existing directory.

qf validate

qf validate [--config-dir /path] [--strict]

Validates project YAML against the schema. --strict treats warnings as errors.

qf run

qf run [--config-dir /path]
[--mode research|trade]
[--engine ingest|dbt|state|label|feature]
[--symbols BTCUSDT ETHUSDT ...]
[--start-date YYYY-MM-DD]
[--end-date YYYY-MM-DD]
FlagDefaultEffect
--modeProject's default_pipeline_moderesearch = batch, trade = streaming
--engineall stagesingest, dbt, state, label, or feature individually
--symbolsAll configured symbolsFilter to specific instruments
--start-dateoptionalFirst date to process (inclusive)
--end-dateoptionalLast date to process (inclusive)

qf pipeline

qf pipeline run [--stage ingest|process|feature] [--mode research|trade]
qf pipeline status
qf pipeline stop
qf pipeline validate

Streaming pipeline lifecycle management. --mode overrides the project default (research for batch, trade for streaming).

qf build

qf build [--config-dir /path] [--output-dir /path] [--clean] [--verbose]

Generates a dbt project from Quantflow configuration. --verbose / -v enables detailed output.

qf ingest

qf ingest --project-dir /path --start-date YYYY-MM-DD --end-date YYYY-MM-DD
[--sources SOURCE ...] [--data-types TYPE ...] [--symbols SYM ...]
[--force-download] [--writer ENGINE] [--write-mode append|overwrite|delete_insert]
[--dry-run]

One-off data download from configured feed providers. --dry-run validates without downloading. Use --force-download to re-download existing files. --write-mode defaults to append.