Skip to main content

Custom Dagster Pipelines

Extend the auto-generated Dagster pipeline with custom assets, jobs, schedules, sensors, and resources.


Overview

QuantFlow auto-discovers and builds Dagster assets from project metadata via quantflow.pipeline.dagster.auto. You can extend this with custom assets, jobs, and schedules while keeping the auto-generated pipeline intact.


Custom Assets

Adding a Non-Standard Asset

Define a custom asset that depends on auto-generated assets:

from dagster import asset, Definitions
from quantflow.pipeline.dagster.auto import build_defs_from_metadata
from quantflow.metadata import load_metadata

@asset(
name="analytics_report",
group_name="analytics",
compute_kind="python",
deps=["ft_features"], # Depends on the auto-generated feature asset
)
def analytics_report(context):
"""Generate a daily analytics report from computed features."""
import duckdb
con = duckdb.connect("data/my_project/quantflow.duckdb")

result = con.execute("""
SELECT
symbol,
feature_name,
AVG(value) AS mean_value,
STDDEV(value) AS std_value
FROM cdm.ft_features
GROUP BY symbol, feature_name
""").fetchdf()

context.log.info(f"Report: {len(result)} feature summaries")
return result


# Build definitions combining auto-generated + custom
from quantflow.metadata import load_metadata

meta = load_metadata(project_dir=".")

auto_defs = build_defs_from_metadata(meta)
custom_defs = Definitions(
assets=[analytics_report],
# Merge with auto-generated assets and jobs
)

# Combine via Definitions.merge (Dagster 1.7+)
final_defs = Definitions.merge(auto_defs, custom_defs)

Key point: build_defs_from_metadata() returns a Definitions object containing all auto-generated assets and jobs. Merge your custom definitions with it.


Custom Jobs

Selective Stage Jobs

Define a job that runs only specific stages:

from dagster import define_asset_job

# Quick validation: ingest + dbt only
validate_job = define_asset_job(
name="quick_validate",
selection=["raw_*", "dbt_transform"],
description="Fast validation: ingest latest data and run dbt",
)

# Feature-only with dependency resolution
feature_research_job = define_asset_job(
name="feature_research",
selection=["cdm_trade_enriched", "cdm_lob_l2", "cdm_labels", "ft_features"],
description="Recompute features and labels from existing bars",
)

Custom Pipeline Job

Combine auto-generated and custom assets into a new job:

@asset(group_name="analytics", compute_kind="python", deps=["ft_features"])
def risk_metrics(context): ...

risk_job = define_asset_job(
name="risk_pipeline",
selection=["ft_features", "risk_metrics"],
)

Custom Schedules

Periodic Batch Runs

Schedule recurring pipeline runs:

from dagster import schedule, RunRequest

@schedule(
job_name="quantflow_pipeline",
cron_schedule="0 6 * * 1-5", # 6 AM weekdays
execution_timezone="US/Eastern",
)
def daily_batch_schedule(context):
return RunRequest(
run_config={
"ops": {
"raw_cryptohftdata": {
"config": {
"start_date": "{{ yesterday }}",
"end_date": "{{ yesterday }}",
}
}
}
}
)

Data Arrival Sensor

Trigger a pipeline when new data lands:

from dagster import sensor, RunRequest, SensorEvaluationContext

@sensor(job_name="quantflow_pipeline")
def new_data_sensor(context: SensorEvaluationContext):
"""Trigger pipeline when new raw files appear."""
import os
cache_dir = ".cache/raw"

last_mtime = context.cursor or "1970-01-01"
latest_mtime = last_mtime

for f in os.listdir(cache_dir):
mtime = os.path.getmtime(os.path.join(cache_dir, f))
if mtime > float(last_mtime):
latest_mtime = max(latest_mtime, str(mtime))

if latest_mtime != last_mtime:
context.update_cursor(latest_mtime)
yield RunRequest()

Custom Resources

Injecting Custom Resources

Add Dagster resources (API clients, config providers, DB connections):

from dagster import resource, Definitions

class APIClient:
def __init__(self, api_key: str):
self.api_key = api_key

def fetch_symbol_metadata(self, symbol: str):
...

@resource
def api_client_resource(context):
return APIClient(api_key=context.resource_config["api_key"])

@asset(
name="symbol_metadata",
group_name="ingest",
required_resource_keys={"api_client"},
)
def symbol_metadata(context):
client = context.resources.api_client
return client.fetch_symbol_metadata("BTCUSDT")

custom_defs = Definitions(
assets=[symbol_metadata],
resources={
"api_client": api_client_resource.configured({
"api_key": "your-key"
})
},
)

Asset Checks

Dagster-Native Data Quality

Add asset checks that run as part of the Dagster pipeline:

from dagster import asset_check, AssetCheckResult

@asset_check(asset="cdm_trade_enriched")
def check_trade_prices_nonzero(trades_df):
"""Ensure all trade prices are positive."""
negative_count = (trades_df["price"] <= 0).sum()
return AssetCheckResult(
passed=negative_count == 0,
metadata={"negative_price_count": int(negative_count)},
severity="ERROR" if negative_count > 100 else "WARN",
)

@asset_check(asset="cdm_trade_enriched")
def check_trade_volume_reasonable(trades_df):
"""Check trade volume is within expected range."""
avg_volume = trades_df["size"].mean()
return AssetCheckResult(
passed=avg_volume > 0,
metadata={"avg_trade_size": float(avg_volume)},
)

Asset checks appear in the Dagit UI alongside asset materializations and can block downstream execution on failure.


Overriding Auto-Discovery

Custom Definitions with Full Control

Replace dagster_workspace.yaml auto-discovery with a custom module:

# my_project/pipeline_defs.py
from dagster import Definitions
from quantflow.pipeline.dagster.auto import build_defs_from_metadata
from quantflow.metadata import load_metadata

from my_project.custom_assets import analytics_report, risk_metrics
from my_project.custom_schedules import daily_batch_schedule
from my_project.custom_resources import api_client_resource

def build_definitions():
meta = load_metadata(project_dir=".")

auto_defs = build_defs_from_metadata(meta)

custom_defs = Definitions(
assets=[analytics_report, risk_metrics],
schedules=[daily_batch_schedule],
resources={"api_client": api_client_resource},
)

return Definitions.merge(auto_defs, custom_defs)


# Must expose a top-level 'defs' for dagster workspace
defs = build_definitions()

Then update dagster_workspace.yaml:

load_from:
- python_module: my_project.pipeline_defs

This gives you full control — auto-generated pipeline plus any customizations.


Dagster Pipeline docsDagster docs