Skip to main content

Batch Execution

The batch path runs inside a Python process. It reads source CDM data from the warehouse, computes features via Polars lazy evaluation, and writes results back. Two execution strategies — sequential (single process) and Ray (distributed) — share the same feature computation core.


Why Polars

Polars was chosen as the batch backend for reasons that align with FeatureDAG's compiler architecture:

Lazy evaluation matches the IR model. The lowering layer produces pl.Expr objects — a tree of deferred computation. This maps directly to FeatureDAG's IR DAG: nodes lower to expressions, expressions compose, and the full graph materializes in a single collect(). Polars' query optimizer then applies predicate pushdown, column pruning, and operator fusion across the entire plan.

Zero-copy Arrow interop. Pipeline stages pass Arrow tables. Polars is built on Arrow — reading from Arrow is pointer-copy, not deserialization. No format conversion tax between stages.

Expression objects compose naturally. pl.Expr is a first-class object that can be passed, stored, and composed before evaluation. Expressions from multiple IR nodes are assembled into a single with_columns chain with no intermediate materialization.

Single-process deployment. Polars runs as an embedded library — no server, no connection management, no serialization overhead. This fits research workflows where the pipeline runs in-process without infrastructure dependencies.


Entry Point: FeatureEngine.compute_features()

The canonical entry point is FeatureEngine.compute_features():

from quantflow.feature_engine import FeatureEngine

fe = FeatureEngine(project_dir=".", engine=db_engine, project_config=cfg)

# Read source CDM tables as Arrow
frames = fe._read_source_tables(
schema_name="quantflow_canonical.cdm",
tables=["cdm_trade_enriched"],
symbol="BTCUSDT",
start_time="2024-01-01",
end_time="2024-01-31",
)

# Compute features
result_arrow, feature_names = fe.compute_features(
feature_specs=[...], # List of feature config dicts
frames=frames, # Dict[str, pa.Table]
)

Internally, compute_features():

  1. Normalize input frames — converts Arrow tables to Polars DataFrames, builds a col_to_frame lookup, and creates column aliases (closetrade_price, volumetrade_size).
  2. Resolve feature specs — for each feature config, resolves inline formulas or looks up FeatureTypes from the registry, remaps abstract input names to actual CDM column names.
  3. Evaluate formulas — calls _evaluate_formula() which uses Python's ast module to parse the formula string into a Polars expression tree. Supports 14 built-in functions: log, abs, sqrt, lag, diff, rolling_mean, rolling_std, rolling_sum, rolling_min, rolling_max, rolling_corr, decay_accum, plus binary operators.
  4. Error isolation — each feature is wrapped in try/except. A single misconfigured feature is logged and skipped — not fatal to the run.
  5. Return — a single Arrow table with all computed feature columns plus event_time.

For the full IR path (used when features require DAG optimization, cross-feature dependencies, or complex lowering), features flow through FormulaIRCompilerQuantflowIRBuilderPolarsDAGExecutor.


Execution Strategies

Sequential Mode

Loops over (symbol, time_shard) tuples. For each:

  1. Read source CDM tables from the warehouse via FeatureEngine._read_source_tables()
  2. Call fe.compute_features(specs, frames) in-process
  3. Convert wide-format results to narrow ft_features schema via _wide_to_narrow()
  4. Write Parquet to staging area, then commit to Iceberg

Ray Mode

Submits FeatureShardConfig tasks to Ray workers. Each (symbol, venue, shard_start, shard_end) tuple becomes a Ray remote task that:

  1. Creates its own FeatureEngine instance
  2. Reads CDM data (with warmup window)
  3. Computes features via compute_features()
  4. Filters to the write window (discarding warmup rows)
  5. Converts to narrow ft_features format
  6. Stages Parquet to S3

After all tasks complete, the main process commits all staged files in a single Iceberg transaction. Bounded concurrency (default 4) limits parallelism via RayBatchExecutor.


Warehouse Output

Computed features are written to the ft_features table in narrow key-value format:

ColumnDescription
symbolInstrument symbol
feature_nameUnique feature identifier
feature_versionFeature version (e.g. v1.0)
feature_idUnique feature instance ID
timestampFeature calculation timestamp
feature_namespaceCategory (signal, execution, quality, etc.)
feature_value_numericScalar feature value
feature_value_jsonComplex values (vectors/tensors)
feature_value_arrayTime series/vector values
statussuccess, error, or partial
processed_timeProcessing timestamp

The narrow format enables time-series queries across features and symbols — filter by feature_name, join on symbol + timestamp, group by feature_namespace.


Batch Pipeline Runner

The BatchPipelineRunner orchestrates the batch pipeline in three conceptual stages:

ingest → process → feature
StageWhat RunsOutput
IngestIngestPipeline per feed providerRaw tables in warehouse
Processdbt → state engine → label engineCDM tables, bars, LOB snapshots, labels
FeatureFeatureEngine.compute_features()ft_featuresFeature table

Process breaks down into three sub-stages orchestrated by Dagster:

Sub-stageWhat RunsOutput
dbtdbt run via DBTProjectGeneratorCDM tables (cdm_trades, cdm_lob_l1)
state_engineNumba fused kernel via Ray, daily shardingcdm_trade_enriched, cdm_lob_l2, 9 bar tables
label_engineLabelEngine.run() — 6 labeling methodscdm_labels

State engine runs via Ray directly (not through Dagster) for maximum throughput. See Dagster Pipeline for full details.


PolarsDAGExecutor (Full IR Path)

When features are compiled through the full IR pipeline, the PolarsDAGExecutor walks the topological order of the IR DAG and applies lowering functions node by node:

class PolarsDAGExecutor:
source_frames: Dict[str, pl.LazyFrame]
backend: PolarsBackend
lf_cache: Dict[str, pl.LazyFrame]

def execute(self, execution_order: List[IRNode]) -> Dict[str, pl.LazyFrame]:
for node in execution_order:
if node.primitive == Primitive.SOURCE:
self.lf_cache[node.id] = self.source_frames[node.params["name"]]
elif node.primitive == Primitive.SINK:
pass # boundary marker
else:
lowering_fn = resolve_lowering(node, "polars")
expr = lowering_fn(node, self.backend)
self.lf_cache[node.id] = current_lf.with_columns(
expr.alias(node.output))

Key behaviors:

  • SOURCE nodes load raw input frames; SINK nodes are pass-through boundaries
  • All other nodes resolve upstream inputs from cache, call the lowering function, append the result as a new column
  • The full computation graph is built as a lazy pl.LazyFrame before any data is touched
  • Per-node error isolation captures individual feature failures without stopping the run

Key Design Decisions

Feature-level error isolation — Failed features are caught, logged, and skipped. A misconfigured feature definition does not block the entire pipeline. Critical for research workflows where feature definitions iterate rapidly.

LazyFrame chaining — Polars LazyFrame objects are chained via successive with_columns calls rather than eagerly evaluated. Polars' query optimizer fuses operations and minimizes intermediate allocations. The final collect() materializes everything in one pass.

Ray for horizontal scale — The run_ray() path distributes feature computation across Ray workers. Each worker processes one (symbol, time_shard) independently. The main process only handles staging file commits — no data touches the main process in the hot path.

Arrow-native throughout — Input is Arrow (from engine reads), computation is Polars (Arrow-backed), output is Arrow (for engine writes). No format conversions, no serialization overhead.