Running Pipelines
Batch mode (research), streaming mode (live trading), Dagster orchestration, incremental updates, and CLI reference.
1. Batch Mode (Research)
Processes historical data in DuckDB. Three stages run sequentially (or individually):
# Run everything: ingest → state engine → features + labels
qf run --start-date 2026-01-01 --end-date 2026-01-31
# Run a specific stage
qf run --engine ingest # Download raw data
qf run --engine dbt # Run dbt: raw → CDM schema
qf run --engine state # State engine: raw → CDM tables
qf run --engine label # Label computation
qf run --engine feature # Feature computation
# Filter by symbol
qf run --symbols BTCUSDT --start-date 2026-01-01 --end-date 2026-01-31
Stage Details
| Stage | Flag | What happens |
|---|---|---|
| Ingest | --engine ingest | Downloads raw trades + LOB from historical feed providers. Caches locally. |
| dbt | --engine dbt | Runs dbt transformations: raw → CDM schema (staging + CDM models). |
| State | --engine state | State Engine: Numba fused kernel produces enriched trades, bars, snapshots. Writes to CDM tables. |
| Label | --engine label | Label Engine computes all labels. Writes to cdm.cdm_labels. |
| Feature | --engine feature | Feature Engine computes all features. Writes to database. |
Output Tables (DuckDB, schema cdm)
| Table | Contents |
|---|---|
cdm_trade_enriched | Trades with L1 enrichment (mid, spread, direction, micro-price) |
cdm_lob_l2 | Full-depth LOB snapshots |
cdm_quotes | Best bid/ask quotes per event |
cdm_{type}_bars | Per-type bar tables: cdm_tick_bars, cdm_volume_bars, cdm_dollar_bars, cdm_time_bars, cdm_imbalance_bars, cdm_run_bars, cdm_volatility_bars, cdm_dollar_imbalance_bars, cdm_cusum_bars |
cdm_labels | Label values in row format |
2. Batch via Dagster
Dagster provides asset lineage, run history, per-stage retries, and a web UI for the batch pipeline. Streaming runs independently in DolphinDB — not through Dagster.
Start Dagster
dagster dev -w dagster_workspace.yaml
Opens the Dagit UI at http://localhost:3000. From the UI you can:
- View the 5-stage asset lineage graph
- Trigger individual jobs or the full pipeline
- Inspect run history, logs, and asset materializations
- Re-run failed stages independently
Asset Stages
ingest → dbt → state_engine → label_engine → feature_engine
| Stage | Assets | Description |
|---|---|---|
ingest | raw_{source} per feed provider | Download raw data, write to {project}_raw.* |
dbt | dbt_transform | Run dbt to transform raw tables into CDM schema |
state_engine | cdm_trade_enriched, cdm_lob_l2, cdm_{type}_bars | Compute enriched trades, LOB snapshots, bar types |
label_engine | cdm_labels | Compute supervised labels from bars |
feature_engine | {project}_feature.features | Compute feature sets |
Available Jobs
| Job | Selection | Use Case |
|---|---|---|
quantflow_pipeline | All assets | Full pipeline run |
quantflow_ingest | Ingest group | Re-download raw data |
quantflow_dbt | dbt group | Re-run dbt transformations |
quantflow_state_engine | State engine group | Recompute bars and snapshots |
quantflow_label_engine | Label engine group | Recompute labels |
quantflow_feature_engine | Feature engine group | Recompute features |
Programmatic Execution
from quantflow.metadata import get_metadata_manager
from quantflow.pipeline import create_runner
mgr = get_metadata_manager(project_root=".", strict=False)
mgr.load_all()
runner = create_runner(mgr)
result = runner.run(stage="all") # or ingest, dbt, state_engine, label_engine, feature_engine
print(f"Status: {result['status']}")
print(f"Run ID: {result['dagster_run_id']}")
print(f"Elapsed: {result['elapsed']}")
3. Streaming Mode (Live Trading)
Deploys the full pipeline into a DolphinDB cluster. Runs continuously until stopped:
# Deploy everything
qf run --mode trade
# Or deploy individual stages
qf pipeline run --stage ingest # Start data ingestion
qf pipeline run --stage process # Start state engine
qf pipeline run --stage feature # Start feature engine
# Monitor
qf pipeline status
# Stop
qf pipeline stop
Four stages run inside DolphinDB:
| Stage | What runs |
|---|---|
| Ingest | WebSocket data feeds into DolphinDB stream tables |
| Process | Bridge engines + state engines: raw → CDM bars, snapshots, enriched trades |
| Feature | Feature engines: consolidated deployment computes all features from stream tables |
All computation is inside DolphinDB. Python handles only deployment and monitoring — it can disconnect after deployment.
4. Write Modes
Three write modes are available across all pipeline stages:
| Mode | Behavior | Use when |
|---|---|---|
delete_insert (default) | Delete rows matching the unique key, then insert new rows | Idempotent re-runs — same input produces same output regardless of run count |
append | Insert new rows without touching existing data | Accumulating data over time, preserving history across runs |
overwrite | Drop and recreate the table, then insert | Full refresh — schema changed or starting clean |
Defaults per stage:
| Stage | Default Mode | Unique Key | Notes |
|---|---|---|---|
| Ingest | append (CLI) / delete_insert (API) | From data type schema | CLI preserves existing raw data by default |
| State Engine | delete_insert | From CDM entity definition | Re-running a date range replaces matching rows |
| Feature Engine | delete_insert | symbol, venue, feature_name, bar_clock, feature_time (tick_to_bar) or venue, symbol, event_time (bar) | Re-running replaces matching rows |
| Label Engine | delete_insert | symbol, timestamp, label_name | Also writes a run_id column for run tracking |
To reprocess after changing thresholds:
qf run --engine state --start-date 2026-01-01 --end-date 2026-01-31
qf run --engine feature --start-date 2026-01-01 --end-date 2026-01-31
5. CLI Reference
qf init
qf init <project_name> [--template crypto|equity|forex] [--output /path] [--force]
Creates a new project from a template. --force overwrites an existing directory.
qf validate
qf validate [--config-dir /path] [--strict]
Validates project YAML against the schema. --strict treats warnings as errors.
qf run
qf run [--config-dir /path]
[--mode research|trade]
[--engine ingest|dbt|state|label|feature]
[--symbols BTCUSDT ETHUSDT ...]
[--start-date YYYY-MM-DD]
[--end-date YYYY-MM-DD]
| Flag | Default | Effect |
|---|---|---|
--mode | Project's default_pipeline_mode | research = batch, trade = streaming |
--engine | all stages | ingest, dbt, state, label, or feature individually |
--symbols | All configured symbols | Filter to specific instruments |
--start-date | optional | First date to process (inclusive) |
--end-date | optional | Last date to process (inclusive) |
qf pipeline
qf pipeline run [--stage ingest|process|feature] [--mode research|trade]
qf pipeline status
qf pipeline stop
qf pipeline validate
Streaming pipeline lifecycle management. --mode overrides the project default (research for batch, trade for streaming).
qf build
qf build [--config-dir /path] [--output-dir /path] [--clean] [--verbose]
Generates a dbt project from Quantflow configuration. --verbose / -v enables detailed output.
qf ingest
qf ingest --project-dir /path --start-date YYYY-MM-DD --end-date YYYY-MM-DD
[--sources SOURCE ...] [--data-types TYPE ...] [--symbols SYM ...]
[--force-download] [--writer ENGINE] [--write-mode append|overwrite|delete_insert]
[--dry-run]
One-off data download from configured feed providers. --dry-run validates without downloading. Use --force-download to re-download existing files. --write-mode defaults to append.