Skip to main content

Lowering

Lowering translates IR DAG nodes into backend-specific expressions. Each (Primitive, op) pair is dispatched through a decorator-based registry to a lowering function that emits code for the target backend — pl.Expr objects for Polars batch, DSL strings for DolphinDB streaming. The IR is backend-neutral; lowering is where backend awareness enters.


Lowering Registry

The registry uses a @register decorator pattern. Each lowering function is tagged with its (Primitive, op) key and an optional backend name:

@register(Primitive.TRANSFORM, "sub")
def lower_sub(node: IRNode, backend: BaseBackend) -> Any:
return backend.sub(
backend.col(node.params["a"]),
backend.col(node.params["b"])
)

@register(Primitive.WINDOW, "rolling_mean", backend="dolphindb")
def lower_rolling_mean_ddb(node: IRNode, backend: DolphinDBBackend) -> str:
return f"mavg({node.params['column']}, {node.params['window']})"

Auto-registration at import time. The @register decorator inserts the function into a global dictionary at module load time. No manual dispatch tables, no if/elif chains, no config files. When a new operation is added, the lowering function is registered in the same file where it's defined.

Multi-backend dispatch. The optional backend= keyword argument registers a function for a specific backend. Functions without a backend argument register for the default backend (Polars). At runtime, resolve_lowering(node, backend_name) looks up the registry:

def resolve_lowering(node: IRNode, backend: str = "polars") -> Callable:
key = (node.primitive, node.op)
fn = REGISTRIES[backend].get(key)
if fn is None:
raise NotImplementedError(f"No lowering for {backend}: {key}")
return fn

BaseBackend Protocol

All backends implement the BaseBackend Protocol — a structural interface defining ~25 methods. Python Protocols use structural subtyping: any class with matching method signatures satisfies the contract, no inheritance required.

CategoryMethods
Column & Literalcol(name), lit(value, dtype=None)
Arithmeticadd(a, b), sub(a, b), mul(a, b), div(a, b), neg(a), pow(a, b)
Unary Mathabs(a), sign(a), sqrt(a), log(a), tanh(a)
Comparisongt(a, b), lt(a, b), eq(a, b), neq(a, b), lte(a, b), gte(a, b)
Conditionalclip(a, lower, upper), conditional_select(cond, true_val, false_val)
Array Opsarray_wavg(values, weights), array_sum(arr), array_mean(arr), array_std(arr), array_div_elementwise(a, b), array_lit(value, length)

PolarsBackend

Translates IR nodes into lazy pl.Expr objects. Every method returns an expression tree — nothing is computed until collect() is called. This laziness allows Polars to apply column pruning, predicate pushdown, and expression fusion across the entire computation graph.

class PolarsBackend:
def col(self, name: str) -> pl.Expr:
return pl.col(name)

def lit(self, value: Any, dtype=None) -> pl.Expr:
return pl.lit(value, dtype=dtype)

def add(self, a: pl.Expr, b: pl.Expr) -> pl.Expr:
return a + b

def sub(self, a: pl.Expr, b: pl.Expr) -> pl.Expr:
return a - b

def rolling_mean(self, expr: pl.Expr, window_size: int) -> pl.Expr:
return expr.rolling_mean(window_size=window_size)

def conditional_select(self, cond, true_val, false_val) -> pl.Expr:
return pl.when(cond).then(true_val).otherwise(false_val)

The PolarsBackend is the default and most complete backend. It's used for all batch computation (research).


DolphinDBBackend

Translates IR nodes into DolphinDB DSL strings. Each method constructs a DolphinDB expression string that is deployed directly to the DolphinDB cluster for streaming execution.

class DolphinDBBackend:
def col(self, name: str) -> str:
return name

def add(self, a: str, b: str) -> str:
return f"({a} + {b})"

def rolling_mean(self, expr: str, window_size: int) -> str:
return f"mavg({expr}, {window_size})"

def conditional_select(self, cond, true_val, false_val) -> str:
return f"iif({cond}, {true_val}, {false_val})"

Why strings? DolphinDB streaming engines accept DSL expressions as string configuration. Generating strings avoids Python↔DolphinDB serialization overhead in the hot path — the Python process only deploys the configuration; all computation runs inside DolphinDB.


Backend-Specific Expression Modules

Polars lowering functions are organized by primitive in backends/polars/expressions/:

window.py

Rolling window and time-series operations:

OperationPolars Implementation
diffexpr.diff(n=window)
rolling_meanexpr.rolling_mean(window_size=window)
rolling_stdexpr.rolling_std(window_size=window)
rolling_corrpl.rolling_corr(a, b, window_size=window)
rolling_zscore(expr - rolling_mean) / rolling_std
autocorrRolling correlation of series with its lagged copy
shannon_entropyBinned histogram entropy over rolling window
adxWilder's ADX from high/low/close over window
linear_regression_slopeOLS slope via rolling_map
half_lifeEstimated τ from autocorrelation decay
time_under_waterCumulative count since last peak
historical_varRolling quantile at specified level

state.py

Cumulative state accumulators:

OperationPolars Implementation
decay_accumClosed-form vectorized: decay^t * cumsum(input / decay^t)
emaewm_mean(alpha=alpha, adjust=False)
cumsumcum_sum() + optional init offset

The decay_accum implementation deserves special note: it uses a closed-form vectorized solution rather than iterative accumulation, making it compatible with Polars' lazy evaluation model.

transform.py

Array extraction and generation operations: array_extract (extract a field from an array of structs), generate_weights (decaying weight vector for weighted averages), session_phase (classify bar into pre-market/open/midday/close/post-market).


PolarsDAGExecutor

The executor walks the topological order of the IR DAG and applies lowering functions node by node:

class PolarsDAGExecutor:
source_frames: Dict[str, pl.LazyFrame] # Input tables by name
backend: PolarsBackend
lf_cache: Dict[str, pl.LazyFrame] # Intermediate results

def execute(self, execution_order: List[IRNode]) -> Dict[str, pl.LazyFrame]:
for node in execution_order:
if node.primitive == Primitive.SOURCE:
self.lf_cache[node.id] = self.source_frames[node.params["name"]]
elif node.primitive == Primitive.SINK:
pass # Output boundary — no computation
else:
inputs = self._resolve_inputs(node)
lowering_fn = resolve_lowering(node, "polars")
expr = lowering_fn(node, inputs, self.backend)
self.lf_cache[node.id] = self.current_lf.with_columns(
expr.alias(node.output))

Key behaviors:

  • SOURCE nodes provide raw input frames; SINK nodes pass through.
  • All other nodes: resolve upstream inputs from the cache, call the lowering function, append the result as a new column via with_columns().
  • The result is a single lazy pl.LazyFrame — the full computation graph is built before any data is touched.
  • Per-node error isolation: if one feature's IRNode fails, the error is captured and the remaining nodes continue.

File Reference

FilePurpose
lowering/registry.py@register decorator + resolve_lowering() dispatch
lowering/engines/base.pyBaseBackend Protocol + default lowering functions
backends/polars/backend.pyPolarsBackend implementation
backends/polars/orchestrator.pyPolarsDAGExecutor
backends/polars/expressions/window.pyWINDOW lowering: rolling, diff, autocorr, entropy, adx
backends/polars/expressions/state.pySTATE lowering: decay_accum, ema, cumsum
backends/polars/expressions/transform.pyTRANSFORM lowering: array_extract, weights, session_phase
backends/polars/utils.pyComplex utilities: decay_accum vectorized, linear_regression_slope