Batch Execution
The batch path runs inside a Python process. It reads source CDM data from the warehouse, computes features via Polars lazy evaluation, and writes results back. Two execution strategies — sequential (single process) and Ray (distributed) — share the same feature computation core.
Why Polars
Polars was chosen as the batch backend for reasons that align with FeatureDAG's compiler architecture:
Lazy evaluation matches the IR model. The lowering layer produces pl.Expr objects — a tree of deferred computation. This maps directly to FeatureDAG's IR DAG: nodes lower to expressions, expressions compose, and the full graph materializes in a single collect(). Polars' query optimizer then applies predicate pushdown, column pruning, and operator fusion across the entire plan.
Zero-copy Arrow interop. Pipeline stages pass Arrow tables. Polars is built on Arrow — reading from Arrow is pointer-copy, not deserialization. No format conversion tax between stages.
Expression objects compose naturally. pl.Expr is a first-class object that can be passed, stored, and composed before evaluation. Expressions from multiple IR nodes are assembled into a single with_columns chain with no intermediate materialization.
Single-process deployment. Polars runs as an embedded library — no server, no connection management, no serialization overhead. This fits research workflows where the pipeline runs in-process without infrastructure dependencies.
Entry Point: FeatureEngine.compute_features()
The canonical entry point is FeatureEngine.compute_features():
from quantflow.feature_engine import FeatureEngine
fe = FeatureEngine(project_dir=".", engine=db_engine, project_config=cfg)
# Read source CDM tables as Arrow
frames = fe._read_source_tables(
schema_name="quantflow_canonical.cdm",
tables=["cdm_trade_enriched"],
symbol="BTCUSDT",
start_time="2024-01-01",
end_time="2024-01-31",
)
# Compute features
result_arrow, feature_names = fe.compute_features(
feature_specs=[...], # List of feature config dicts
frames=frames, # Dict[str, pa.Table]
)
Internally, compute_features():
- Normalize input frames — converts Arrow tables to Polars DataFrames, builds a
col_to_framelookup, and creates column aliases (close→trade_price,volume→trade_size). - Resolve feature specs — for each feature config, resolves inline formulas or looks up FeatureTypes from the registry, remaps abstract input names to actual CDM column names.
- Evaluate formulas — calls
_evaluate_formula()which uses Python'sastmodule to parse the formula string into a Polars expression tree. Supports 14 built-in functions:log,abs,sqrt,lag,diff,rolling_mean,rolling_std,rolling_sum,rolling_min,rolling_max,rolling_corr,decay_accum, plus binary operators. - Error isolation — each feature is wrapped in try/except. A single misconfigured feature is logged and skipped — not fatal to the run.
- Return — a single Arrow table with all computed feature columns plus
event_time.
For the full IR path (used when features require DAG optimization, cross-feature dependencies, or complex lowering), features flow through FormulaIRCompiler → QuantflowIRBuilder → PolarsDAGExecutor.
Execution Strategies
Sequential Mode
Loops over (symbol, time_shard) tuples. For each:
- Read source CDM tables from the warehouse via
FeatureEngine._read_source_tables() - Call
fe.compute_features(specs, frames)in-process - Convert wide-format results to narrow
ft_featuresschema via_wide_to_narrow() - Write Parquet to staging area, then commit to Iceberg
Ray Mode
Submits FeatureShardConfig tasks to Ray workers. Each (symbol, venue, shard_start, shard_end) tuple becomes a Ray remote task that:
- Creates its own
FeatureEngineinstance - Reads CDM data (with warmup window)
- Computes features via
compute_features() - Filters to the write window (discarding warmup rows)
- Converts to narrow
ft_featuresformat - Stages Parquet to S3
After all tasks complete, the main process commits all staged files in a single Iceberg transaction. Bounded concurrency (default 4) limits parallelism via RayBatchExecutor.
Warehouse Output
Computed features are written to the ft_features table in narrow key-value format:
| Column | Description |
|---|---|
symbol | Instrument symbol |
feature_name | Unique feature identifier |
feature_version | Feature version (e.g. v1.0) |
feature_id | Unique feature instance ID |
timestamp | Feature calculation timestamp |
feature_namespace | Category (signal, execution, quality, etc.) |
feature_value_numeric | Scalar feature value |
feature_value_json | Complex values (vectors/tensors) |
feature_value_array | Time series/vector values |
status | success, error, or partial |
processed_time | Processing timestamp |
The narrow format enables time-series queries across features and symbols — filter by feature_name, join on symbol + timestamp, group by feature_namespace.
Batch Pipeline Runner
The BatchPipelineRunner orchestrates the batch pipeline in three conceptual stages:
ingest → process → feature
| Stage | What Runs | Output |
|---|---|---|
| Ingest | IngestPipeline per feed provider | Raw tables in warehouse |
| Process | dbt → state engine → label engine | CDM tables, bars, LOB snapshots, labels |
| Feature | FeatureEngine.compute_features() → ft_features | Feature table |
Process breaks down into three sub-stages orchestrated by Dagster:
| Sub-stage | What Runs | Output |
|---|---|---|
| dbt | dbt run via DBTProjectGenerator | CDM tables (cdm_trades, cdm_lob_l1) |
| state_engine | Numba fused kernel via Ray, daily sharding | cdm_trade_enriched, cdm_lob_l2, 9 bar tables |
| label_engine | LabelEngine.run() — 6 labeling methods | cdm_labels |
State engine runs via Ray directly (not through Dagster) for maximum throughput. See Dagster Pipeline for full details.
PolarsDAGExecutor (Full IR Path)
When features are compiled through the full IR pipeline, the PolarsDAGExecutor walks the topological order of the IR DAG and applies lowering functions node by node:
class PolarsDAGExecutor:
source_frames: Dict[str, pl.LazyFrame]
backend: PolarsBackend
lf_cache: Dict[str, pl.LazyFrame]
def execute(self, execution_order: List[IRNode]) -> Dict[str, pl.LazyFrame]:
for node in execution_order:
if node.primitive == Primitive.SOURCE:
self.lf_cache[node.id] = self.source_frames[node.params["name"]]
elif node.primitive == Primitive.SINK:
pass # boundary marker
else:
lowering_fn = resolve_lowering(node, "polars")
expr = lowering_fn(node, self.backend)
self.lf_cache[node.id] = current_lf.with_columns(
expr.alias(node.output))
Key behaviors:
- SOURCE nodes load raw input frames; SINK nodes are pass-through boundaries
- All other nodes resolve upstream inputs from cache, call the lowering function, append the result as a new column
- The full computation graph is built as a lazy
pl.LazyFramebefore any data is touched - Per-node error isolation captures individual feature failures without stopping the run
Key Design Decisions
Feature-level error isolation — Failed features are caught, logged, and skipped. A misconfigured feature definition does not block the entire pipeline. Critical for research workflows where feature definitions iterate rapidly.
LazyFrame chaining — Polars LazyFrame objects are chained via successive with_columns calls rather than eagerly evaluated. Polars' query optimizer fuses operations and minimizes intermediate allocations. The final collect() materializes everything in one pass.
Ray for horizontal scale — The run_ray() path distributes feature computation across Ray workers. Each worker processes one (symbol, time_shard) independently. The main process only handles staging file commits — no data touches the main process in the hot path.
Arrow-native throughout — Input is Arrow (from engine reads), computation is Polars (Arrow-backed), output is Arrow (for engine writes). No format conversions, no serialization overhead.