Lowering
Lowering translates IR DAG nodes into backend-specific expressions. Each (Primitive, op) pair is dispatched through a decorator-based registry to a lowering function that emits code for the target backend — pl.Expr objects for Polars batch, DSL strings for DolphinDB streaming. The IR is backend-neutral; lowering is where backend awareness enters.
Lowering Registry
The registry uses a @register decorator pattern. Each lowering function is tagged with its (Primitive, op) key and an optional backend name:
@register(Primitive.TRANSFORM, "sub")
def lower_sub(node: IRNode, backend: BaseBackend) -> Any:
return backend.sub(
backend.col(node.params["a"]),
backend.col(node.params["b"])
)
@register(Primitive.WINDOW, "rolling_mean", backend="dolphindb")
def lower_rolling_mean_ddb(node: IRNode, backend: DolphinDBBackend) -> str:
return f"mavg({node.params['column']}, {node.params['window']})"
Auto-registration at import time. The @register decorator inserts the function into a global dictionary at module load time. No manual dispatch tables, no if/elif chains, no config files. When a new operation is added, the lowering function is registered in the same file where it's defined.
Multi-backend dispatch. The optional backend= keyword argument registers a function for a specific backend. Functions without a backend argument register for the default backend (Polars). At runtime, resolve_lowering(node, backend_name) looks up the registry:
def resolve_lowering(node: IRNode, backend: str = "polars") -> Callable:
key = (node.primitive, node.op)
fn = REGISTRIES[backend].get(key)
if fn is None:
raise NotImplementedError(f"No lowering for {backend}: {key}")
return fn
BaseBackend Protocol
All backends implement the BaseBackend Protocol — a structural interface defining ~25 methods. Python Protocols use structural subtyping: any class with matching method signatures satisfies the contract, no inheritance required.
| Category | Methods |
|---|---|
| Column & Literal | col(name), lit(value, dtype=None) |
| Arithmetic | add(a, b), sub(a, b), mul(a, b), div(a, b), neg(a), pow(a, b) |
| Unary Math | abs(a), sign(a), sqrt(a), log(a), tanh(a) |
| Comparison | gt(a, b), lt(a, b), eq(a, b), neq(a, b), lte(a, b), gte(a, b) |
| Conditional | clip(a, lower, upper), conditional_select(cond, true_val, false_val) |
| Array Ops | array_wavg(values, weights), array_sum(arr), array_mean(arr), array_std(arr), array_div_elementwise(a, b), array_lit(value, length) |
PolarsBackend
Translates IR nodes into lazy pl.Expr objects. Every method returns an expression tree — nothing is computed until collect() is called. This laziness allows Polars to apply column pruning, predicate pushdown, and expression fusion across the entire computation graph.
class PolarsBackend:
def col(self, name: str) -> pl.Expr:
return pl.col(name)
def lit(self, value: Any, dtype=None) -> pl.Expr:
return pl.lit(value, dtype=dtype)
def add(self, a: pl.Expr, b: pl.Expr) -> pl.Expr:
return a + b
def sub(self, a: pl.Expr, b: pl.Expr) -> pl.Expr:
return a - b
def rolling_mean(self, expr: pl.Expr, window_size: int) -> pl.Expr:
return expr.rolling_mean(window_size=window_size)
def conditional_select(self, cond, true_val, false_val) -> pl.Expr:
return pl.when(cond).then(true_val).otherwise(false_val)
The PolarsBackend is the default and most complete backend. It's used for all batch computation (research).
DolphinDBBackend
Translates IR nodes into DolphinDB DSL strings. Each method constructs a DolphinDB expression string that is deployed directly to the DolphinDB cluster for streaming execution.
class DolphinDBBackend:
def col(self, name: str) -> str:
return name
def add(self, a: str, b: str) -> str:
return f"({a} + {b})"
def rolling_mean(self, expr: str, window_size: int) -> str:
return f"mavg({expr}, {window_size})"
def conditional_select(self, cond, true_val, false_val) -> str:
return f"iif({cond}, {true_val}, {false_val})"
Why strings? DolphinDB streaming engines accept DSL expressions as string configuration. Generating strings avoids Python↔DolphinDB serialization overhead in the hot path — the Python process only deploys the configuration; all computation runs inside DolphinDB.
Backend-Specific Expression Modules
Polars lowering functions are organized by primitive in backends/polars/expressions/:
window.py
Rolling window and time-series operations:
| Operation | Polars Implementation |
|---|---|
diff | expr.diff(n=window) |
rolling_mean | expr.rolling_mean(window_size=window) |
rolling_std | expr.rolling_std(window_size=window) |
rolling_corr | pl.rolling_corr(a, b, window_size=window) |
rolling_zscore | (expr - rolling_mean) / rolling_std |
autocorr | Rolling correlation of series with its lagged copy |
shannon_entropy | Binned histogram entropy over rolling window |
adx | Wilder's ADX from high/low/close over window |
linear_regression_slope | OLS slope via rolling_map |
half_life | Estimated τ from autocorrelation decay |
time_under_water | Cumulative count since last peak |
historical_var | Rolling quantile at specified level |
state.py
Cumulative state accumulators:
| Operation | Polars Implementation |
|---|---|
decay_accum | Closed-form vectorized: decay^t * cumsum(input / decay^t) |
ema | ewm_mean(alpha=alpha, adjust=False) |
cumsum | cum_sum() + optional init offset |
The decay_accum implementation deserves special note: it uses a closed-form vectorized solution rather than iterative accumulation, making it compatible with Polars' lazy evaluation model.
transform.py
Array extraction and generation operations: array_extract (extract a field from an array of structs), generate_weights (decaying weight vector for weighted averages), session_phase (classify bar into pre-market/open/midday/close/post-market).
PolarsDAGExecutor
The executor walks the topological order of the IR DAG and applies lowering functions node by node:
class PolarsDAGExecutor:
source_frames: Dict[str, pl.LazyFrame] # Input tables by name
backend: PolarsBackend
lf_cache: Dict[str, pl.LazyFrame] # Intermediate results
def execute(self, execution_order: List[IRNode]) -> Dict[str, pl.LazyFrame]:
for node in execution_order:
if node.primitive == Primitive.SOURCE:
self.lf_cache[node.id] = self.source_frames[node.params["name"]]
elif node.primitive == Primitive.SINK:
pass # Output boundary — no computation
else:
inputs = self._resolve_inputs(node)
lowering_fn = resolve_lowering(node, "polars")
expr = lowering_fn(node, inputs, self.backend)
self.lf_cache[node.id] = self.current_lf.with_columns(
expr.alias(node.output))
Key behaviors:
- SOURCE nodes provide raw input frames; SINK nodes pass through.
- All other nodes: resolve upstream inputs from the cache, call the lowering function, append the result as a new column via
with_columns(). - The result is a single lazy
pl.LazyFrame— the full computation graph is built before any data is touched. - Per-node error isolation: if one feature's IRNode fails, the error is captured and the remaining nodes continue.
File Reference
| File | Purpose |
|---|---|
lowering/registry.py | @register decorator + resolve_lowering() dispatch |
lowering/engines/base.py | BaseBackend Protocol + default lowering functions |
backends/polars/backend.py | PolarsBackend implementation |
backends/polars/orchestrator.py | PolarsDAGExecutor |
backends/polars/expressions/window.py | WINDOW lowering: rolling, diff, autocorr, entropy, adx |
backends/polars/expressions/state.py | STATE lowering: decay_accum, ema, cumsum |
backends/polars/expressions/transform.py | TRANSFORM lowering: array_extract, weights, session_phase |
backends/polars/utils.py | Complex utilities: decay_accum vectorized, linear_regression_slope |