Dagster Pipeline Orchestration
QuantFlow uses Dagster to orchestrate batch execution — research and historical data processing. Streaming (live trading) runs independently via DolphinDB's native reactive state engines, not through Dagster.
Why Dagster
| Requirement | How Dagster Addresses It |
|---|---|
| Asset lineage | Dependencies between raw tables, CDM models, bars, labels, and features — resolved from metadata |
| Incremental execution | Run only the stages that need updating (e.g., re-run features without re-ingesting) |
| Observability | Dagit UI for run history, asset materializations, logs, and failure debugging |
| Stage isolation | Each stage runs as a separate job with group-level granularity |
| Python-native | Assets are Python functions, not a DAG-as-YAML DSL |
Pipeline Architecture
ingest → dbt → state_engine → label_engine → feature_engine
Five asset groups, each producing well-defined outputs:
| Group | Assets | Compute Kind | Description |
|---|---|---|---|
ingest | raw_{source} per feed provider | python | Download raw market data via IngestPipeline and write to the target engine |
dbt | dbt_transform | dbt | Run dbt run to transform raw tables into CDM schema |
state_engine | state_engine_full (multi-asset) | numba | Single-pass Numba kernel produces enriched trades, LOB snapshots, and bar tables |
label_engine | cdm_labels | python | Compute supervised learning labels from bar tables |
feature_engine | ft_features | polars | Compute configured features and write to the feature table |
State engine runs outside Dagster. When executing stage="all" via BatchPipelineRunner, the state engine runs first via Ray with daily sharding, then Dagster orchestrates ingest, dbt, label, and feature stages. This keeps heavy Numba/Ray computation out of the Dagster process. The state_engine_full asset in Dagster is used for standalone quantflow_state_engine job runs.
Asset Definitions
Stage 1: Ingest
For each feed provider in project.ingest.feeds, a dynamic asset raw_{source_name} is created:
@asset(
name=f"raw_{src.name}",
group_name="ingest",
compute_kind="python",
)
def _fn(context, config: PipelineConfig):
start, end = _resolve_date_range(config)
from quantflow.ingest.runner import run as ingest_run
return ingest_run(
project_dir=str(project_dir),
feed=src.name,
mode=config.write_mode,
start_date=start,
end_date=end,
)
Ingest assets are only generated when ingest.feeds has enabled entries. When no ingest feeds are configured, the pipeline references CDM tables from an upstream data project as SourceAsset entries.
Stage 2: dbt Transform
The dbt_transform asset depends on all ingest assets. It auto-generates a dbt project via DBTProjectGenerator if one doesn't exist, then runs dbt run:
@asset(
name="dbt_transform",
deps=ingest_asset_keys,
group_name="dbt",
compute_kind="dbt",
)
def dbt_transform(context, config: PipelineConfig):
start, end = _resolve_date_range(config)
# Auto-generate dbt project if missing
if not (dbt_dir / "dbt_project.yml").exists():
dbt_generate(project_dir=str(project_dir), output_dir="dbt_project")
dbt_run(output_dir=str(dbt_dir), start_date=start, end_date=end, ...)
Stage 3: State Engine
A multi_asset named state_engine_full produces multiple CDM outputs from a single Numba kernel pass:
| Output Asset | Description |
|---|---|
cdm_trade_enriched | Trades with L1 context — mid-price, spread, direction, signed volume |
cdm_lob_l2 | Full-depth LOB snapshots with derived metrics |
cdm_{type}_bars | One asset per configured bar type — cdm_time_bars, cdm_tick_bars, etc. |
@multi_asset(
outs={"cdm_trade_enriched": AssetOut(group_name="state_engine"),
"cdm_lob_l2": AssetOut(group_name="state_engine"),
**{f"cdm_{bt}_bars": AssetOut(group_name="state_engine") for bt in bar_types}},
deps=[dbt_transform],
group_name="state_engine",
compute_kind="numba",
)
def state_engine_full(context, config: PipelineConfig):
counts = _run_state_engine_full(context, metadata, ...)
for name, value in counts.items():
yield Output(value=value, output_name=name)
Under the hood, _run_state_engine_full calls quantflow.state_engine.runner.run_production() via Ray with time-range sharding.
Stage 4: Label Engine
The cdm_labels asset depends on all state engine outputs. Labels are only computed when project.label_engine is configured:
@asset(
name="cdm_labels",
deps=[AssetKey(k) for k in state_asset_keys],
group_name="label_engine",
compute_kind="python",
)
def cdm_labels(context, config: PipelineConfig):
return _run_labels(context, metadata, start_date=start, end_date=end,
symbols=config.symbols, run_id="dagster")
Calls quantflow.label_engine.runner.run() which dispatches to registered label methods (triple_barrier, fixed_horizon_return, trend_scanning, etc.).
Stage 5: Feature Engine
The ft_features asset depends on both state engine outputs and labels. It reads feature configurations from project.feature_engine.features, compiles them via FeatureDAG, and runs via Ray:
@asset(
name="ft_features",
deps=[*state_asset_keys, label_asset_key],
group_name="feature_engine",
compute_kind="polars",
)
def ft_features(context, config: PipelineConfig):
return _run_features(context, metadata, ...)
Calls quantflow.feature_engine.runner.run_ray() which parallelizes feature computation across Ray workers with time-range sharding.
Jobs
Six jobs are defined automatically based on which asset groups have assets:
| Job Name | Selection | Condition |
|---|---|---|
quantflow_pipeline | All assets | Always |
quantflow_ingest | Group ingest | Ingest feeds configured |
quantflow_dbt | Group dbt | Ingest feeds configured |
quantflow_state_engine | Group state_engine | Ingest feeds configured |
quantflow_label_engine | Group label_engine | label_engine config present |
quantflow_feature_engine | Group feature_engine | feature_engine config present |
Jobs are only created when their group has at least one asset.
Auto-Discovery
Place a dagster_workspace.yaml at your project root:
load_from:
- python_module: quantflow.pipeline.dagster.auto
Then start Dagster:
dagster dev -w dagster_workspace.yaml
The auto-discovery module (quantflow.pipeline.dagster.auto) automatically:
- Sets
DAGSTER_HOMEto{project_root}/.temp - Loads project metadata from the current working directory
- Calls
build_defs_from_metadata()to construct all assets and jobs - Exposes the resulting
Definitionsobject to Dagster
Programmatic Execution
The BatchPipelineRunner provides a Python API for notebook or script usage:
from quantflow.metadata import get_metadata_manager
from quantflow.pipeline import create_runner
mgr = get_metadata_manager(project_root=".", strict=False)
mgr.load_all()
runner = create_runner(mgr)
result = runner.run(
stage="all", # ingest, dbt, state_engine, label_engine, feature_engine, all
symbols=["BTCUSDT"],
start_date="2024-01-01",
end_date="2024-12-31",
)
When stage="all", the runner executes state engine directly via Ray (not through Dagster), then runs ingest, dbt, label, and feature stages through Dagster's execute_in_process. Per-stage execution selects the corresponding asset group.
Streaming Pipeline
Streaming (live trading) uses StreamingPipelineRunner — not Dagster:
| Stage | What Runs |
|---|---|
ingest | Create DolphinDB stream tables, start WebSocket data feeds |
process | Deploy state engine — bars, snapshots, enriched trades — inside DolphinDB |
feature | Deploy feature engines into DolphinDB for continuous computation |
all | Full end-to-end streaming deployment |
Monitor via qf pipeline status. Python disconnects after deployment — the pipeline runs autonomously inside the DolphinDB cluster.
Configuration
The pipeline is configured through QuantFlow project metadata. No separate Dagster config files beyond the workspace YAML.
| Metadata Field | Used By | Purpose |
|---|---|---|
project.ingest.feeds[] | Ingest assets | Which feed providers to download |
project.symbols[] | All stages | Filters data by symbol |
project.data_processing.historical_data_engine | All stages | Target engine |
project.label_engine | Label engine | Label method configuration |
project.feature_engine | Feature engine | Feature selection and config |
local.engine[] | All stages | Connection details and credentials |
The Dagster PipelineConfig supports these runtime overrides:
| Field | Type | Default | Description |
|---|---|---|---|
start_date | str | "" (yesterday) | First date to process |
end_date | str | "" (today) | Last date to process |
symbols | List[str] | Project symbols | Symbol filter |
write_mode | str | "skip_if_exists" | Write mode for all stages |
max_concurrency | int | 4 | Ray parallelism |
shard_granularity | str | "daily" | Time shard granularity |
batch_days | int | None | Days per dbt batch |
skip_test | bool | True | Skip dbt tests |
Deployment
Local Development
# Terminal 1: Dagster webserver + daemon
cd my_quantflow_project
dagster dev -w dagster_workspace.yaml
# Terminal 2: Trigger a run
python -c "
from quantflow.pipeline import create_runner
from quantflow.metadata import get_metadata_manager
mgr = get_metadata_manager(project_root='.', strict=False)
mgr.load_all()
result = create_runner(mgr).run(stage='all')
print(result)
"
Production
- Schedule via Dagster daemon with
@scheduledecorators - Trigger via CI/CD using the Dagster GraphQL API or
dagster job execute - Containerize with a Docker image containing QuantFlow and project config
Set DAGSTER_HOME to a persistent location with PostgreSQL-backed run storage for production use.