Pipeline Orchestration
Layer 4 bridges project metadata to resolved feature instances and constructs the execution DAG. It handles feature resolution, topological ordering, DAG optimization, and the batch entry point.
PipelineBuilder
PipelineBuilder (pipeline/pipeline_builder.py) is the orchestrator that transforms project-level YAML declarations into a fully resolved, topologically sorted set of FeatureInstance objects ready for execution.
5-Step Workflow
Project YAML → FeatureType Registry → FeatureInstance Resolution → DAG Construction → Execution
- Load project metadata — parse
quantflow_project.yml, extract feature references, data sources, and engine configuration - Resolve feature types — look up each referenced FeatureType in the
FeatureTypeRegistry - Compile formulas —
FormulaIRCompilercompiles each FeatureType'sformula:string into IRNodes via Python'sastmodule - Build IR DAG —
QuantflowIRBuilderconstructs the rustworkx DAG, wires cross-feature dependencies, validates schema contracts - Topological sort — Kahn's algorithm produces a valid execution order
Topological Sort (Kahn's Algorithm)
The dependency graph is a DAG by construction — cycles are rejected during ComputationStep validation. Kahn's algorithm produces a linear execution order:
1. Compute in-degree for every node
2. Enqueue all nodes with in-degree 0 (SOURCE nodes)
3. While queue not empty:
a. Dequeue node → append to execution order
b. For each downstream neighbor:
- Decrement in-degree
- If in-degree reaches 0 → enqueue
This ensures every node executes only after all its upstream dependencies are resolved.
Dependency Resolution
Feature instances can reference each other — a COMBINE feature depends on its upstream feature outputs. PipelineBuilder resolves these cross-references:
signal_features:
- name: ofi
type: ofi
- name: vpin
type: vpin
quality_features:
- name: flow_quality
type: signal_to_noise_ratio
inputs: [ofi, vpin]
The builder detects that flow_quality depends on ofi and vpin (via its inputs: [ofi, vpin] declaration), places ofi and vpin earlier in the execution order, and wires their output columns as inputs to the quality feature.
QuantflowIRBuilder (DAG Construction)
QuantflowIRBuilder (pipeline/ir_builder.py) constructs the rustworkx-backed DAG from resolved FeatureInstance objects:
import rustworkx as rx
class QuantflowIRBuilder:
dag: rx.PyDAG # rustworkx directed acyclic graph
nodes: dict[str, int] # alias → node index mapping
def add_node(self, step: ComputationStep) -> int: ...
def add_edge(self, from_alias: str, to_alias: str) -> None: ...
def build(self, instances: list[FeatureInstance]) -> rx.PyDAG: ...
Why rustworkx? It's the same DAG library used by Polars for its lazy query engine — shared data structure means zero-copy DAG transfer to the Polars execution planner.
Node Fusion Optimization
Independent nodes that share the same group_by keys and window specification can be fused into a single group_by().agg([...]) call. This reduces the number of sequential passes over the data:
# Without fusion: 3 group_by passes
group_by("symbol").agg(rolling_mean("close", 20))
group_by("symbol").agg(rolling_std("close", 20))
group_by("symbol").agg(rolling_sum("volume", 20))
# With fusion: 1 group_by pass
group_by("symbol").agg([
rolling_mean("close", 20),
rolling_std("close", 20),
rolling_sum("volume", 20),
])
Fusion is opt-in and applies only to nodes at the same topological depth with compatible group keys.
compute.py (Batch Entry Point)
compute.py is the entry point for batch feature computation. It orchestrates the full pipeline:
1. Load project metadata (sources, features, engine config)
2. Resolve feature instances via PipelineBuilder
3. Build IR DAG via QuantflowIRBuilder
4. Lower to backend expressions via lowering registry
5. Execute with PolarsDAGExecutor
6. Write computed features to warehouse sinks
# Simplified execution flow
from quantflow.feature_engine import FeatureEngine
engine = FeatureEngine(metadata=meta)
result = engine.run(
symbols=["BTCUSDT"],
pack="intraday_momentum",
)
The execution model is lazy — expressions are built into a single pl.LazyFrame plan and materialized in one collect() call. Column pruning, predicate pushdown, and other Polars optimizations apply to the entire plan.
Per-Feature Error Isolation
If one feature fails (e.g., missing source column, type mismatch caught by schema contract), the error is isolated to that feature. The rest of the pipeline continues:
results = {
"ofi": Success(df_1),
"vpin": Success(df_2),
"composite_flow": Failure(TypeError("Expected FLOAT, got STRING")),
}
This ensures a single broken feature definition doesn't block the entire pipeline run.
Pipeline Integration
FeatureType (YAML)
│
▼
PipelineBuilder.resolve() → list[FeatureInstance]
│
▼
QuantflowIRBuilder.build() → rx.PyDAG[IRNode]
│
▼
Lowering Registry → Backend Expressions
│
▼
PolarsDAGExecutor.run() → Computed Features
Each stage validates its output before passing to the next. Errors are surfaced at the earliest possible point — YAML parse errors at project load, type mismatches at DAG construction, and runtime errors isolated per feature.
File Reference
| File | Purpose |
|---|---|
ir/ast/compiler.py | FormulaIRCompiler — formula string → IRNode list |
ir/ast/handlers.py | FormulaHandlersMixin — per-function AST dispatch |
ir/ast/builtins.py | Function classification frozensets |
ir/ast/utils.py | formula_to_ir, formula_to_steps, extract_formula_variables |
ir/builder.py | QuantflowIRBuilder — rustworkx DAG construction |
ir/primitives.py | Primitive enum + IRNode dataclass |
ir/contracts.py | OP_CONTRACTS — 50+ schema validation contracts |
lowering/registry.py | @register decorator + resolve_lowering dispatch |
lowering/engines/base.py | BaseBackend protocol + default lowering functions |
backends/polars/backend.py | PolarsBackend |
backends/polars/orchestrator.py | PolarsDAGExecutor |
backends/polars/expressions/window.py | WINDOW lowering functions |
backends/polars/expressions/state.py | STATE lowering functions |
backends/polars/expressions/transform.py | TRANSFORM lowering functions |
engine.py | FeatureEngine — high-level entry point |
runner.py | CLI runner for batch computation |