Skip to main content

Pipeline Orchestration

Layer 4 bridges project metadata to resolved feature instances and constructs the execution DAG. It handles feature resolution, topological ordering, DAG optimization, and the batch entry point.


PipelineBuilder

PipelineBuilder (pipeline/pipeline_builder.py) is the orchestrator that transforms project-level YAML declarations into a fully resolved, topologically sorted set of FeatureInstance objects ready for execution.

5-Step Workflow

Project YAML → FeatureType Registry → FeatureInstance Resolution → DAG Construction → Execution
  1. Load project metadata — parse quantflow_project.yml, extract feature references, data sources, and engine configuration
  2. Resolve feature types — look up each referenced FeatureType in the FeatureTypeRegistry
  3. Compile formulasFormulaIRCompiler compiles each FeatureType's formula: string into IRNodes via Python's ast module
  4. Build IR DAGQuantflowIRBuilder constructs the rustworkx DAG, wires cross-feature dependencies, validates schema contracts
  5. Topological sort — Kahn's algorithm produces a valid execution order

Topological Sort (Kahn's Algorithm)

The dependency graph is a DAG by construction — cycles are rejected during ComputationStep validation. Kahn's algorithm produces a linear execution order:

1. Compute in-degree for every node
2. Enqueue all nodes with in-degree 0 (SOURCE nodes)
3. While queue not empty:
a. Dequeue node → append to execution order
b. For each downstream neighbor:
- Decrement in-degree
- If in-degree reaches 0 → enqueue

This ensures every node executes only after all its upstream dependencies are resolved.

Dependency Resolution

Feature instances can reference each other — a COMBINE feature depends on its upstream feature outputs. PipelineBuilder resolves these cross-references:

signal_features:
- name: ofi
type: ofi
- name: vpin
type: vpin
quality_features:
- name: flow_quality
type: signal_to_noise_ratio
inputs: [ofi, vpin]

The builder detects that flow_quality depends on ofi and vpin (via its inputs: [ofi, vpin] declaration), places ofi and vpin earlier in the execution order, and wires their output columns as inputs to the quality feature.


QuantflowIRBuilder (DAG Construction)

QuantflowIRBuilder (pipeline/ir_builder.py) constructs the rustworkx-backed DAG from resolved FeatureInstance objects:

import rustworkx as rx

class QuantflowIRBuilder:
dag: rx.PyDAG # rustworkx directed acyclic graph
nodes: dict[str, int] # alias → node index mapping

def add_node(self, step: ComputationStep) -> int: ...
def add_edge(self, from_alias: str, to_alias: str) -> None: ...
def build(self, instances: list[FeatureInstance]) -> rx.PyDAG: ...

Why rustworkx? It's the same DAG library used by Polars for its lazy query engine — shared data structure means zero-copy DAG transfer to the Polars execution planner.

Node Fusion Optimization

Independent nodes that share the same group_by keys and window specification can be fused into a single group_by().agg([...]) call. This reduces the number of sequential passes over the data:

# Without fusion: 3 group_by passes
group_by("symbol").agg(rolling_mean("close", 20))
group_by("symbol").agg(rolling_std("close", 20))
group_by("symbol").agg(rolling_sum("volume", 20))

# With fusion: 1 group_by pass
group_by("symbol").agg([
rolling_mean("close", 20),
rolling_std("close", 20),
rolling_sum("volume", 20),
])

Fusion is opt-in and applies only to nodes at the same topological depth with compatible group keys.


compute.py (Batch Entry Point)

compute.py is the entry point for batch feature computation. It orchestrates the full pipeline:

1. Load project metadata (sources, features, engine config)
2. Resolve feature instances via PipelineBuilder
3. Build IR DAG via QuantflowIRBuilder
4. Lower to backend expressions via lowering registry
5. Execute with PolarsDAGExecutor
6. Write computed features to warehouse sinks
# Simplified execution flow
from quantflow.feature_engine import FeatureEngine

engine = FeatureEngine(metadata=meta)
result = engine.run(
symbols=["BTCUSDT"],
pack="intraday_momentum",
)

The execution model is lazy — expressions are built into a single pl.LazyFrame plan and materialized in one collect() call. Column pruning, predicate pushdown, and other Polars optimizations apply to the entire plan.

Per-Feature Error Isolation

If one feature fails (e.g., missing source column, type mismatch caught by schema contract), the error is isolated to that feature. The rest of the pipeline continues:

results = {
"ofi": Success(df_1),
"vpin": Success(df_2),
"composite_flow": Failure(TypeError("Expected FLOAT, got STRING")),
}

This ensures a single broken feature definition doesn't block the entire pipeline run.


Pipeline Integration

FeatureType (YAML)


PipelineBuilder.resolve() → list[FeatureInstance]


QuantflowIRBuilder.build() → rx.PyDAG[IRNode]


Lowering Registry → Backend Expressions


PolarsDAGExecutor.run() → Computed Features

Each stage validates its output before passing to the next. Errors are surfaced at the earliest possible point — YAML parse errors at project load, type mismatches at DAG construction, and runtime errors isolated per feature.


File Reference

FilePurpose
ir/ast/compiler.pyFormulaIRCompiler — formula string → IRNode list
ir/ast/handlers.pyFormulaHandlersMixin — per-function AST dispatch
ir/ast/builtins.pyFunction classification frozensets
ir/ast/utils.pyformula_to_ir, formula_to_steps, extract_formula_variables
ir/builder.pyQuantflowIRBuilder — rustworkx DAG construction
ir/primitives.pyPrimitive enum + IRNode dataclass
ir/contracts.pyOP_CONTRACTS — 50+ schema validation contracts
lowering/registry.py@register decorator + resolve_lowering dispatch
lowering/engines/base.pyBaseBackend protocol + default lowering functions
backends/polars/backend.pyPolarsBackend
backends/polars/orchestrator.pyPolarsDAGExecutor
backends/polars/expressions/window.pyWINDOW lowering functions
backends/polars/expressions/state.pySTATE lowering functions
backends/polars/expressions/transform.pyTRANSFORM lowering functions
engine.pyFeatureEngine — high-level entry point
runner.pyCLI runner for batch computation