Skip to main content

Dagster Pipeline Orchestration

QuantFlow uses Dagster to orchestrate batch execution — research and historical data processing. Streaming (live trading) runs independently via DolphinDB's native reactive state engines, not through Dagster.


Why Dagster

RequirementHow Dagster Addresses It
Asset lineageDependencies between raw tables, CDM models, bars, labels, and features — resolved from metadata
Incremental executionRun only the stages that need updating (e.g., re-run features without re-ingesting)
ObservabilityDagit UI for run history, asset materializations, logs, and failure debugging
Stage isolationEach stage runs as a separate job with group-level granularity
Python-nativeAssets are Python functions, not a DAG-as-YAML DSL

Pipeline Architecture

ingest → dbt → state_engine → label_engine → feature_engine

Five asset groups, each producing well-defined outputs:

GroupAssetsCompute KindDescription
ingestraw_{source} per feed providerpythonDownload raw market data via IngestPipeline and write to the target engine
dbtdbt_transformdbtRun dbt run to transform raw tables into CDM schema
state_enginestate_engine_full (multi-asset)numbaSingle-pass Numba kernel produces enriched trades, LOB snapshots, and bar tables
label_enginecdm_labelspythonCompute supervised learning labels from bar tables
feature_engineft_featurespolarsCompute configured features and write to the feature table

State engine runs outside Dagster. When executing stage="all" via BatchPipelineRunner, the state engine runs first via Ray with daily sharding, then Dagster orchestrates ingest, dbt, label, and feature stages. This keeps heavy Numba/Ray computation out of the Dagster process. The state_engine_full asset in Dagster is used for standalone quantflow_state_engine job runs.


Asset Definitions

Stage 1: Ingest

For each feed provider in project.ingest.feeds, a dynamic asset raw_{source_name} is created:

@asset(
name=f"raw_{src.name}",
group_name="ingest",
compute_kind="python",
)
def _fn(context, config: PipelineConfig):
start, end = _resolve_date_range(config)
from quantflow.ingest.runner import run as ingest_run
return ingest_run(
project_dir=str(project_dir),
feed=src.name,
mode=config.write_mode,
start_date=start,
end_date=end,
)

Ingest assets are only generated when ingest.feeds has enabled entries. When no ingest feeds are configured, the pipeline references CDM tables from an upstream data project as SourceAsset entries.

Stage 2: dbt Transform

The dbt_transform asset depends on all ingest assets. It auto-generates a dbt project via DBTProjectGenerator if one doesn't exist, then runs dbt run:

@asset(
name="dbt_transform",
deps=ingest_asset_keys,
group_name="dbt",
compute_kind="dbt",
)
def dbt_transform(context, config: PipelineConfig):
start, end = _resolve_date_range(config)
# Auto-generate dbt project if missing
if not (dbt_dir / "dbt_project.yml").exists():
dbt_generate(project_dir=str(project_dir), output_dir="dbt_project")
dbt_run(output_dir=str(dbt_dir), start_date=start, end_date=end, ...)

Stage 3: State Engine

A multi_asset named state_engine_full produces multiple CDM outputs from a single Numba kernel pass:

Output AssetDescription
cdm_trade_enrichedTrades with L1 context — mid-price, spread, direction, signed volume
cdm_lob_l2Full-depth LOB snapshots with derived metrics
cdm_{type}_barsOne asset per configured bar type — cdm_time_bars, cdm_tick_bars, etc.
@multi_asset(
outs={"cdm_trade_enriched": AssetOut(group_name="state_engine"),
"cdm_lob_l2": AssetOut(group_name="state_engine"),
**{f"cdm_{bt}_bars": AssetOut(group_name="state_engine") for bt in bar_types}},
deps=[dbt_transform],
group_name="state_engine",
compute_kind="numba",
)
def state_engine_full(context, config: PipelineConfig):
counts = _run_state_engine_full(context, metadata, ...)
for name, value in counts.items():
yield Output(value=value, output_name=name)

Under the hood, _run_state_engine_full calls quantflow.state_engine.runner.run_production() via Ray with time-range sharding.

Stage 4: Label Engine

The cdm_labels asset depends on all state engine outputs. Labels are only computed when project.label_engine is configured:

@asset(
name="cdm_labels",
deps=[AssetKey(k) for k in state_asset_keys],
group_name="label_engine",
compute_kind="python",
)
def cdm_labels(context, config: PipelineConfig):
return _run_labels(context, metadata, start_date=start, end_date=end,
symbols=config.symbols, run_id="dagster")

Calls quantflow.label_engine.runner.run() which dispatches to registered label methods (triple_barrier, fixed_horizon_return, trend_scanning, etc.).

Stage 5: Feature Engine

The ft_features asset depends on both state engine outputs and labels. It reads feature configurations from project.feature_engine.features, compiles them via FeatureDAG, and runs via Ray:

@asset(
name="ft_features",
deps=[*state_asset_keys, label_asset_key],
group_name="feature_engine",
compute_kind="polars",
)
def ft_features(context, config: PipelineConfig):
return _run_features(context, metadata, ...)

Calls quantflow.feature_engine.runner.run_ray() which parallelizes feature computation across Ray workers with time-range sharding.


Jobs

Six jobs are defined automatically based on which asset groups have assets:

Job NameSelectionCondition
quantflow_pipelineAll assetsAlways
quantflow_ingestGroup ingestIngest feeds configured
quantflow_dbtGroup dbtIngest feeds configured
quantflow_state_engineGroup state_engineIngest feeds configured
quantflow_label_engineGroup label_enginelabel_engine config present
quantflow_feature_engineGroup feature_enginefeature_engine config present

Jobs are only created when their group has at least one asset.


Auto-Discovery

Place a dagster_workspace.yaml at your project root:

load_from:
- python_module: quantflow.pipeline.dagster.auto

Then start Dagster:

dagster dev -w dagster_workspace.yaml

The auto-discovery module (quantflow.pipeline.dagster.auto) automatically:

  1. Sets DAGSTER_HOME to {project_root}/.temp
  2. Loads project metadata from the current working directory
  3. Calls build_defs_from_metadata() to construct all assets and jobs
  4. Exposes the resulting Definitions object to Dagster

Programmatic Execution

The BatchPipelineRunner provides a Python API for notebook or script usage:

from quantflow.metadata import get_metadata_manager
from quantflow.pipeline import create_runner

mgr = get_metadata_manager(project_root=".", strict=False)
mgr.load_all()

runner = create_runner(mgr)
result = runner.run(
stage="all", # ingest, dbt, state_engine, label_engine, feature_engine, all
symbols=["BTCUSDT"],
start_date="2024-01-01",
end_date="2024-12-31",
)

When stage="all", the runner executes state engine directly via Ray (not through Dagster), then runs ingest, dbt, label, and feature stages through Dagster's execute_in_process. Per-stage execution selects the corresponding asset group.


Streaming Pipeline

Streaming (live trading) uses StreamingPipelineRunner — not Dagster:

StageWhat Runs
ingestCreate DolphinDB stream tables, start WebSocket data feeds
processDeploy state engine — bars, snapshots, enriched trades — inside DolphinDB
featureDeploy feature engines into DolphinDB for continuous computation
allFull end-to-end streaming deployment

Monitor via qf pipeline status. Python disconnects after deployment — the pipeline runs autonomously inside the DolphinDB cluster.


Configuration

The pipeline is configured through QuantFlow project metadata. No separate Dagster config files beyond the workspace YAML.

Metadata FieldUsed ByPurpose
project.ingest.feeds[]Ingest assetsWhich feed providers to download
project.symbols[]All stagesFilters data by symbol
project.data_processing.historical_data_engineAll stagesTarget engine
project.label_engineLabel engineLabel method configuration
project.feature_engineFeature engineFeature selection and config
local.engine[]All stagesConnection details and credentials

The Dagster PipelineConfig supports these runtime overrides:

FieldTypeDefaultDescription
start_datestr"" (yesterday)First date to process
end_datestr"" (today)Last date to process
symbolsList[str]Project symbolsSymbol filter
write_modestr"skip_if_exists"Write mode for all stages
max_concurrencyint4Ray parallelism
shard_granularitystr"daily"Time shard granularity
batch_daysintNoneDays per dbt batch
skip_testboolTrueSkip dbt tests

Deployment

Local Development

# Terminal 1: Dagster webserver + daemon
cd my_quantflow_project
dagster dev -w dagster_workspace.yaml

# Terminal 2: Trigger a run
python -c "
from quantflow.pipeline import create_runner
from quantflow.metadata import get_metadata_manager

mgr = get_metadata_manager(project_root='.', strict=False)
mgr.load_all()
result = create_runner(mgr).run(stage='all')
print(result)
"

Production

  • Schedule via Dagster daemon with @schedule decorators
  • Trigger via CI/CD using the Dagster GraphQL API or dagster job execute
  • Containerize with a Docker image containing QuantFlow and project config

Set DAGSTER_HOME to a persistent location with PostgreSQL-backed run storage for production use.