Skip to main content

IR DAG

The Intermediate Representation (IR) is the language-agnostic computation graph that sits between formula compilation and backend execution. Every feature — whether defined by a formula string or legacy steps — resolves to a DAG of IRNode objects. The IR is frozen, validated, and backend-neutral.


Computation Primitives

Every IRNode is classified into one of 6 primitives. The primitive determines how the node executes — its data access pattern, state requirements, and execution model.

PrimitiveRoleExecution ModelExample
SOURCEData input boundaryReads from external tablecdm_trade_enriched, cdm_time_bars
TRANSFORMSingle-row statelessEach output row depends only on current input rowa + b, sign(x), abs(x)
WINDOWFixed-size trailing windowEach output row depends on last N input rowsrolling_mean(close, 20), diff(price, 1)
STATECumulative across all prior rowsEach output row depends on all prior input rowsema(price, 0.94), cumsum(ofi, 0)
EVENTEvent-driven triggerReserved for barrier/condition-triggered computationThreshold crossing, barrier hit
SINKData output boundaryMarks final output columnexport

Key properties:

  • TRANSFORM is stateless and trivially parallelizable — each row is independent.
  • WINDOW requires a fixed lookback (e.g., last 20 rows) and is the workhorse of feature computation.
  • STATE has unbounded memory — it accumulates across the entire history. This is the most expensive primitive but necessary for EMA, cumulative sums, and decay accumulators.
  • EVENT is reserved for future event-driven triggers — computation gated by a barrier condition rather than a fixed window.
  • SOURCE and SINK are DAG boundary markers — they don't compute anything, they demarcate where data enters and exits the graph.

IRNode

An IRNode is a frozen (immutable) dataclass representing one computation step:

@dataclass(frozen=True)
class IRNode:
id: str # Unique node identifier in the DAG
primitive: Primitive # One of: SOURCE, TRANSFORM, WINDOW, STATE, SINK
op: str # Operation name: "rolling_mean", "add", "sign", ...
params: Dict[str, Any] # Resolved parameters: {"window": 20, "column": "close"}
inputs: Dict[str, str] # Role → upstream node ID: {"lhs": "node_3", "rhs": "node_5"}
output: Optional[str] # Output column name
output_schema: Optional[Dict] # Expected output column types
input_schema: Dict[str, str] # Expected input column types
stage_label: Optional[str] # Logical grouping label
partition_keys: List[str] # Group-by keys (default: ["symbol"])

Frozen by design. Once constructed, an IRNode cannot be mutated. This prevents accidental modification during optimization passes (node fusion, dead code elimination) and ensures the DAG is consistent from lowering through execution.

Inputs as role → ID. Unlike a simple list of upstream IDs, inputs uses named roles ("lhs", "rhs", "operand", "condition") so that lowering functions know how to use each upstream node. A TRANSFORM.sub node expects "lhs" and "rhs"; a TRANSFORM.conditional expects "condition", "true_value", "false_value".


Schema Contracts (OP_CONTRACTS)

50+ schema contracts validate column types at DAG construction time — not mid-compute. Each (primitive, op) pair has a registered contract:

OP_CONTRACTS[(Primitive.WINDOW, "rolling_mean")] = SchemaContract(
required_input_cols=["column"],
input_dtypes={"column": {Dtype.FLOAT, Dtype.INT, Dtype.DECIMAL}},
output_dtypes={"rolling_mean": Dtype.FLOAT},
shape_preserving=False,
requires_window=True,
)

What contracts enforce:

FieldDescription
required_input_colsColumn names that must exist in upstream output
input_dtypesAllowed dtypes for each input column
output_dtypesGuaranteed output dtypes after this operation
shape_preservingWhether output row count equals input row count
requires_windowWhether a window parameter is mandatory

Contracts are checked when the DAG is built — during QuantflowIRBuilder.expand_stage(). A mismatch (passing a STRING column to an operation expecting FLOAT) fails immediately with a clear error pointing to the specific node and column, rather than silently producing wrong results at runtime.


Column Aliases

Column name normalization ensures unambiguous references within the DAG. When multiple features use the same operation (e.g., two rolling_mean nodes from different features), their output aliases are disambiguated by appending an index:

rolling_mean → rolling_mean_0
rolling_mean → rolling_mean_1

Aliases serve as both unique identifiers in the DAG and output column names in the result table. Upstream nodes are referenced by alias, not by raw column name — this prevents collisions when the same source column is used in multiple contexts.


QuantflowIRBuilder

QuantflowIRBuilder (ir/builder.py) constructs the rustworkx-backed DAG from IRNodes. It is the central assembly point where formula-compiled nodes and FeatureType-instantiated steps all converge.

Key Methods

class QuantflowIRBuilder:
dag: rx.PyDAG # rustworkx directed acyclic graph
nodes: dict[str, int] # alias → node index mapping

def add_source(self, name: str, schema: dict) -> int: ...
def expand_stage(self, stage_name: str, feature_type: FeatureType,
inputs: dict, params: dict) -> None: ...
def wire_dependencies(self, stage_name: str,
depends_on: list[str]) -> None: ...
def build_from_feature_defs(self, feature_defs: list,
type_registry: FeatureTypeRegistry) -> rx.PyDAG: ...

expand_stage() is the core method. It takes a FeatureType, instantiates it with user-provided parameters and input mappings, compiles its formula to IRNodes, and adds them to the DAG. Schema contracts are validated during this step.

wire_dependencies() connects cross-feature edges. When a quality_features entry depends on a signal_features output (e.g., ofi_snr depends on ofi), this method adds the edges that ensure correct topological ordering.

build_from_feature_defs() is the high-level entry point used by the compute engine. It:

  1. Creates SOURCE nodes for all CDM tables
  2. Expands each feature into its component IRNodes
  3. Wires cross-feature dependencies
  4. Runs node fusion optimization
  5. Validates the DAG is acyclic

Why rustworkx?

rustworkx is the same DAG library used by Polars for its lazy query engine. Using the same graph data structure means the IR DAG can be transferred to the Polars execution planner with zero structural conversion — only the node payloads change from IRNodes to Polars expressions.


IR Construction Paths

IRNodes can be constructed through two paths that converge at QuantflowIRBuilder:

Path A — Formula compilation (primary):

Formula string
→ FormulaIRCompiler._walk(ast.parse(formula))
→ list[IRNode]
→ QuantflowIRBuilder.expand_stage()

Path B — FeatureType instantiation (legacy):

FeatureType.steps (ComputationStep list)
→ FeatureType.instantiate(params)
→ list[ComputationStep] (resolved)
→ QuantflowIRBuilder.expand_stage()

Path B is maintained for backward compatibility with steps-based YAML definitions, but all new FeatureTypes use Path A. Both produce identical IRNode structures — the lowering and execution layers don't distinguish between them.


Node Fusion Optimization

Adjacent TRANSFORM nodes can be fused into a single with_columns() call in the Polars execution plan:

# Before fusion: 3 sequential with_columns passes
df.with_columns(pl.col("a") + pl.col("b"))
df.with_columns(pl.col("tmp_0").abs())
df.with_columns(pl.col("tmp_1") * 2)

# After fusion: 1 with_columns pass with chained expressions
df.with_columns(((pl.col("a") + pl.col("b")).abs() * 2))

Fusion is safe because TRANSFORM nodes are stateless — each row's output depends only on that row's input. The optimizer skips fusion when a node's output is referenced across features (cross-feature dependencies must remain visible for correct ordering).


Topological Sort

The DAG is executed in topological order using Kahn's algorithm:

1. Compute in-degree for every node
2. Enqueue all nodes with in-degree 0 (SOURCE nodes)
3. While queue not empty:
a. Dequeue node → append to execution order
b. For each downstream neighbor:
- Decrement in-degree
- If in-degree reaches 0 → enqueue

This ensures every node executes only after all its upstream dependencies are resolved. Cycles are rejected during DAG construction — a cyclic dependency (A depends on B which depends on A) fails validation immediately.


File Reference

FilePurpose
ir/primitives.pyPrimitive enum + IRNode dataclass
ir/contracts.pyOP_CONTRACTS — 50+ schema validation contracts
ir/builder.pyQuantflowIRBuilder — rustworkx DAG construction
ir/ast/compiler.pyFormulaIRCompiler — formula → IRNode compilation
ir/ast/builtins.pyFunction classification frozensets
ir/ast/handlers.pyPer-function AST handler dispatch
ir/ast/utils.pyformula_to_ir, formula_to_steps helpers