IR DAG
The Intermediate Representation (IR) is the language-agnostic computation graph that sits between formula compilation and backend execution. Every feature — whether defined by a formula string or legacy steps — resolves to a DAG of IRNode objects. The IR is frozen, validated, and backend-neutral.
Computation Primitives
Every IRNode is classified into one of 6 primitives. The primitive determines how the node executes — its data access pattern, state requirements, and execution model.
| Primitive | Role | Execution Model | Example |
|---|---|---|---|
SOURCE | Data input boundary | Reads from external table | cdm_trade_enriched, cdm_time_bars |
TRANSFORM | Single-row stateless | Each output row depends only on current input row | a + b, sign(x), abs(x) |
WINDOW | Fixed-size trailing window | Each output row depends on last N input rows | rolling_mean(close, 20), diff(price, 1) |
STATE | Cumulative across all prior rows | Each output row depends on all prior input rows | ema(price, 0.94), cumsum(ofi, 0) |
EVENT | Event-driven trigger | Reserved for barrier/condition-triggered computation | Threshold crossing, barrier hit |
SINK | Data output boundary | Marks final output column | export |
Key properties:
TRANSFORMis stateless and trivially parallelizable — each row is independent.WINDOWrequires a fixed lookback (e.g., last 20 rows) and is the workhorse of feature computation.STATEhas unbounded memory — it accumulates across the entire history. This is the most expensive primitive but necessary for EMA, cumulative sums, and decay accumulators.EVENTis reserved for future event-driven triggers — computation gated by a barrier condition rather than a fixed window.SOURCEandSINKare DAG boundary markers — they don't compute anything, they demarcate where data enters and exits the graph.
IRNode
An IRNode is a frozen (immutable) dataclass representing one computation step:
@dataclass(frozen=True)
class IRNode:
id: str # Unique node identifier in the DAG
primitive: Primitive # One of: SOURCE, TRANSFORM, WINDOW, STATE, SINK
op: str # Operation name: "rolling_mean", "add", "sign", ...
params: Dict[str, Any] # Resolved parameters: {"window": 20, "column": "close"}
inputs: Dict[str, str] # Role → upstream node ID: {"lhs": "node_3", "rhs": "node_5"}
output: Optional[str] # Output column name
output_schema: Optional[Dict] # Expected output column types
input_schema: Dict[str, str] # Expected input column types
stage_label: Optional[str] # Logical grouping label
partition_keys: List[str] # Group-by keys (default: ["symbol"])
Frozen by design. Once constructed, an IRNode cannot be mutated. This prevents accidental modification during optimization passes (node fusion, dead code elimination) and ensures the DAG is consistent from lowering through execution.
Inputs as role → ID. Unlike a simple list of upstream IDs, inputs uses named roles ("lhs", "rhs", "operand", "condition") so that lowering functions know how to use each upstream node. A TRANSFORM.sub node expects "lhs" and "rhs"; a TRANSFORM.conditional expects "condition", "true_value", "false_value".
Schema Contracts (OP_CONTRACTS)
50+ schema contracts validate column types at DAG construction time — not mid-compute. Each (primitive, op) pair has a registered contract:
OP_CONTRACTS[(Primitive.WINDOW, "rolling_mean")] = SchemaContract(
required_input_cols=["column"],
input_dtypes={"column": {Dtype.FLOAT, Dtype.INT, Dtype.DECIMAL}},
output_dtypes={"rolling_mean": Dtype.FLOAT},
shape_preserving=False,
requires_window=True,
)
What contracts enforce:
| Field | Description |
|---|---|
required_input_cols | Column names that must exist in upstream output |
input_dtypes | Allowed dtypes for each input column |
output_dtypes | Guaranteed output dtypes after this operation |
shape_preserving | Whether output row count equals input row count |
requires_window | Whether a window parameter is mandatory |
Contracts are checked when the DAG is built — during QuantflowIRBuilder.expand_stage(). A mismatch (passing a STRING column to an operation expecting FLOAT) fails immediately with a clear error pointing to the specific node and column, rather than silently producing wrong results at runtime.
Column Aliases
Column name normalization ensures unambiguous references within the DAG. When multiple features use the same operation (e.g., two rolling_mean nodes from different features), their output aliases are disambiguated by appending an index:
rolling_mean → rolling_mean_0
rolling_mean → rolling_mean_1
Aliases serve as both unique identifiers in the DAG and output column names in the result table. Upstream nodes are referenced by alias, not by raw column name — this prevents collisions when the same source column is used in multiple contexts.
QuantflowIRBuilder
QuantflowIRBuilder (ir/builder.py) constructs the rustworkx-backed DAG from IRNodes. It is the central assembly point where formula-compiled nodes and FeatureType-instantiated steps all converge.
Key Methods
class QuantflowIRBuilder:
dag: rx.PyDAG # rustworkx directed acyclic graph
nodes: dict[str, int] # alias → node index mapping
def add_source(self, name: str, schema: dict) -> int: ...
def expand_stage(self, stage_name: str, feature_type: FeatureType,
inputs: dict, params: dict) -> None: ...
def wire_dependencies(self, stage_name: str,
depends_on: list[str]) -> None: ...
def build_from_feature_defs(self, feature_defs: list,
type_registry: FeatureTypeRegistry) -> rx.PyDAG: ...
expand_stage() is the core method. It takes a FeatureType, instantiates it with user-provided parameters and input mappings, compiles its formula to IRNodes, and adds them to the DAG. Schema contracts are validated during this step.
wire_dependencies() connects cross-feature edges. When a quality_features entry depends on a signal_features output (e.g., ofi_snr depends on ofi), this method adds the edges that ensure correct topological ordering.
build_from_feature_defs() is the high-level entry point used by the compute engine. It:
- Creates SOURCE nodes for all CDM tables
- Expands each feature into its component IRNodes
- Wires cross-feature dependencies
- Runs node fusion optimization
- Validates the DAG is acyclic
Why rustworkx?
rustworkx is the same DAG library used by Polars for its lazy query engine. Using the same graph data structure means the IR DAG can be transferred to the Polars execution planner with zero structural conversion — only the node payloads change from IRNodes to Polars expressions.
IR Construction Paths
IRNodes can be constructed through two paths that converge at QuantflowIRBuilder:
Path A — Formula compilation (primary):
Formula string
→ FormulaIRCompiler._walk(ast.parse(formula))
→ list[IRNode]
→ QuantflowIRBuilder.expand_stage()
Path B — FeatureType instantiation (legacy):
FeatureType.steps (ComputationStep list)
→ FeatureType.instantiate(params)
→ list[ComputationStep] (resolved)
→ QuantflowIRBuilder.expand_stage()
Path B is maintained for backward compatibility with steps-based YAML definitions, but all new FeatureTypes use Path A. Both produce identical IRNode structures — the lowering and execution layers don't distinguish between them.
Node Fusion Optimization
Adjacent TRANSFORM nodes can be fused into a single with_columns() call in the Polars execution plan:
# Before fusion: 3 sequential with_columns passes
df.with_columns(pl.col("a") + pl.col("b"))
df.with_columns(pl.col("tmp_0").abs())
df.with_columns(pl.col("tmp_1") * 2)
# After fusion: 1 with_columns pass with chained expressions
df.with_columns(((pl.col("a") + pl.col("b")).abs() * 2))
Fusion is safe because TRANSFORM nodes are stateless — each row's output depends only on that row's input. The optimizer skips fusion when a node's output is referenced across features (cross-feature dependencies must remain visible for correct ordering).
Topological Sort
The DAG is executed in topological order using Kahn's algorithm:
1. Compute in-degree for every node
2. Enqueue all nodes with in-degree 0 (SOURCE nodes)
3. While queue not empty:
a. Dequeue node → append to execution order
b. For each downstream neighbor:
- Decrement in-degree
- If in-degree reaches 0 → enqueue
This ensures every node executes only after all its upstream dependencies are resolved. Cycles are rejected during DAG construction — a cyclic dependency (A depends on B which depends on A) fails validation immediately.
File Reference
| File | Purpose |
|---|---|
ir/primitives.py | Primitive enum + IRNode dataclass |
ir/contracts.py | OP_CONTRACTS — 50+ schema validation contracts |
ir/builder.py | QuantflowIRBuilder — rustworkx DAG construction |
ir/ast/compiler.py | FormulaIRCompiler — formula → IRNode compilation |
ir/ast/builtins.py | Function classification frozensets |
ir/ast/handlers.py | Per-function AST handler dispatch |
ir/ast/utils.py | formula_to_ir, formula_to_steps helpers |