Custom Dagster Pipelines
Extend the auto-generated Dagster pipeline with custom assets, jobs, schedules, sensors, and resources.
Overview
QuantFlow auto-discovers and builds Dagster assets from project metadata via quantflow.pipeline.dagster.auto. You can extend this with custom assets, jobs, and schedules while keeping the auto-generated pipeline intact.
Custom Assets
Adding a Non-Standard Asset
Define a custom asset that depends on auto-generated assets:
from dagster import asset, Definitions
from quantflow.pipeline.dagster.auto import build_defs_from_metadata
from quantflow.metadata import load_metadata
@asset(
name="analytics_report",
group_name="analytics",
compute_kind="python",
deps=["ft_features"], # Depends on the auto-generated feature asset
)
def analytics_report(context):
"""Generate a daily analytics report from computed features."""
import duckdb
con = duckdb.connect("data/my_project/quantflow.duckdb")
result = con.execute("""
SELECT
symbol,
feature_name,
AVG(value) AS mean_value,
STDDEV(value) AS std_value
FROM cdm.ft_features
GROUP BY symbol, feature_name
""").fetchdf()
context.log.info(f"Report: {len(result)} feature summaries")
return result
# Build definitions combining auto-generated + custom
from quantflow.metadata import load_metadata
meta = load_metadata(project_dir=".")
auto_defs = build_defs_from_metadata(meta)
custom_defs = Definitions(
assets=[analytics_report],
# Merge with auto-generated assets and jobs
)
# Combine via Definitions.merge (Dagster 1.7+)
final_defs = Definitions.merge(auto_defs, custom_defs)
Key point: build_defs_from_metadata() returns a Definitions object containing all auto-generated assets and jobs. Merge your custom definitions with it.
Custom Jobs
Selective Stage Jobs
Define a job that runs only specific stages:
from dagster import define_asset_job
# Quick validation: ingest + dbt only
validate_job = define_asset_job(
name="quick_validate",
selection=["raw_*", "dbt_transform"],
description="Fast validation: ingest latest data and run dbt",
)
# Feature-only with dependency resolution
feature_research_job = define_asset_job(
name="feature_research",
selection=["cdm_trade_enriched", "cdm_lob_l2", "cdm_labels", "ft_features"],
description="Recompute features and labels from existing bars",
)
Custom Pipeline Job
Combine auto-generated and custom assets into a new job:
@asset(group_name="analytics", compute_kind="python", deps=["ft_features"])
def risk_metrics(context): ...
risk_job = define_asset_job(
name="risk_pipeline",
selection=["ft_features", "risk_metrics"],
)
Custom Schedules
Periodic Batch Runs
Schedule recurring pipeline runs:
from dagster import schedule, RunRequest
@schedule(
job_name="quantflow_pipeline",
cron_schedule="0 6 * * 1-5", # 6 AM weekdays
execution_timezone="US/Eastern",
)
def daily_batch_schedule(context):
return RunRequest(
run_config={
"ops": {
"raw_cryptohftdata": {
"config": {
"start_date": "{{ yesterday }}",
"end_date": "{{ yesterday }}",
}
}
}
}
)
Data Arrival Sensor
Trigger a pipeline when new data lands:
from dagster import sensor, RunRequest, SensorEvaluationContext
@sensor(job_name="quantflow_pipeline")
def new_data_sensor(context: SensorEvaluationContext):
"""Trigger pipeline when new raw files appear."""
import os
cache_dir = ".cache/raw"
last_mtime = context.cursor or "1970-01-01"
latest_mtime = last_mtime
for f in os.listdir(cache_dir):
mtime = os.path.getmtime(os.path.join(cache_dir, f))
if mtime > float(last_mtime):
latest_mtime = max(latest_mtime, str(mtime))
if latest_mtime != last_mtime:
context.update_cursor(latest_mtime)
yield RunRequest()
Custom Resources
Injecting Custom Resources
Add Dagster resources (API clients, config providers, DB connections):
from dagster import resource, Definitions
class APIClient:
def __init__(self, api_key: str):
self.api_key = api_key
def fetch_symbol_metadata(self, symbol: str):
...
@resource
def api_client_resource(context):
return APIClient(api_key=context.resource_config["api_key"])
@asset(
name="symbol_metadata",
group_name="ingest",
required_resource_keys={"api_client"},
)
def symbol_metadata(context):
client = context.resources.api_client
return client.fetch_symbol_metadata("BTCUSDT")
custom_defs = Definitions(
assets=[symbol_metadata],
resources={
"api_client": api_client_resource.configured({
"api_key": "your-key"
})
},
)
Asset Checks
Dagster-Native Data Quality
Add asset checks that run as part of the Dagster pipeline:
from dagster import asset_check, AssetCheckResult
@asset_check(asset="cdm_trade_enriched")
def check_trade_prices_nonzero(trades_df):
"""Ensure all trade prices are positive."""
negative_count = (trades_df["price"] <= 0).sum()
return AssetCheckResult(
passed=negative_count == 0,
metadata={"negative_price_count": int(negative_count)},
severity="ERROR" if negative_count > 100 else "WARN",
)
@asset_check(asset="cdm_trade_enriched")
def check_trade_volume_reasonable(trades_df):
"""Check trade volume is within expected range."""
avg_volume = trades_df["size"].mean()
return AssetCheckResult(
passed=avg_volume > 0,
metadata={"avg_trade_size": float(avg_volume)},
)
Asset checks appear in the Dagit UI alongside asset materializations and can block downstream execution on failure.
Overriding Auto-Discovery
Custom Definitions with Full Control
Replace dagster_workspace.yaml auto-discovery with a custom module:
# my_project/pipeline_defs.py
from dagster import Definitions
from quantflow.pipeline.dagster.auto import build_defs_from_metadata
from quantflow.metadata import load_metadata
from my_project.custom_assets import analytics_report, risk_metrics
from my_project.custom_schedules import daily_batch_schedule
from my_project.custom_resources import api_client_resource
def build_definitions():
meta = load_metadata(project_dir=".")
auto_defs = build_defs_from_metadata(meta)
custom_defs = Definitions(
assets=[analytics_report, risk_metrics],
schedules=[daily_batch_schedule],
resources={"api_client": api_client_resource},
)
return Definitions.merge(auto_defs, custom_defs)
# Must expose a top-level 'defs' for dagster workspace
defs = build_definitions()
Then update dagster_workspace.yaml:
load_from:
- python_module: my_project.pipeline_defs
This gives you full control — auto-generated pipeline plus any customizations.