Streaming Feature Engine
The DolphinDBStreamFeatureEngine deploys features into the DolphinDB cluster. It resolves feature dependencies, compiles IR DAGs, and deploys DolphinDB engines with consolidated metrics.
Mode System
The streaming feature engine supports three computation modes, matching the batch path:
tick mode: Features deployed as createReactiveStateEngine instances. Events processed one at a time as they arrive. Used for HFT signals needing per-event updates.
bar mode: Features deployed as createTimeSeriesEngine instances. Features compute on bar boundaries (OHLCV rows). Intermediate layers write to per-layer tables, then forward to the feature output table. Used for bar-native strategies.
tick_to_bar mode: Two-phase deployment:
- Phase A: Deploys tick-level features (re-enters the deploy loop in tick mode, computing features at event granularity).
- Phase B: Deploys
createTimeSeriesEngineinstances that downsample tick-level feature outputs to bar intervals via windowed aggregation.
This captures microstructure dynamics at tick resolution while producing bar-aligned outputs.
Layer-Based Topological Deployment
Features are deployed in dependency layers via _build_layers(). This is the streaming equivalent of the batch path's wave-based resolution:
-
Topological sort: Features are sorted into layers based on their declared dependencies. Layer 0 features depend only on CDM source tables. Layer 1 features depend on Layer 0 outputs. And so on.
-
Per-layer grouping: Within each layer, features are grouped by input table. All features in a layer that read from the same table are candidates for consolidated deployment.
-
Cross-layer data flow: Each layer's output tables become the input tables for the next layer. The
_resolve_input()method resolves input tables with priority:- Cross-feature cascade (upstream feature output)
- Base CDM entity name
- Direct entity name
- Feature name (fallback)
-
Engine count optimization: Consolidated deployment groups features sharing the same input table into a single
createReactiveStateEnginewith N output columns, reducing engine count by approximately 60%.
Consolidated Deployment
When N simple features share the same input table, instead of deploying N separate reactive state engines, a single engine is deployed with N output columns. Each column is named after its feature instance name to prevent collisions.
Expression folding: Intermediate columns in a feature's step DAG are inlined into consumer expressions via regex substitution before engine deployment. Variables are substituted in descending order of name length to avoid partial-match issues. Only terminal outputs become engine metrics — intermediate columns never materialize in stream tables.
Consolidated metrics: When multiple features share one engine, their folded expressions are combined into a single metrics list with feature instance names as column aliases, guaranteeing uniqueness even when YAML output_column values collide.
Lowering Registry for DolphinDB
The DolphinDB backend uses the same @register(Primitive, op, backend="dolphindb") decorator pattern as the Polars backend. Lowering functions in feature_engine/lowering.py register ~60 operations:
TRANSFORM: Binary/unary math, comparison, conditional, array operations — returns DolphinDB DSL strings.
WINDOW: Rolling aggregates mapped to DolphinDB built-ins:
rolling_mean→mavg(col, w)rolling_sum→msum(col, w)rolling_std→ stable two-pass formula (see below)rolling_min→mmin(col, w)rolling_max→mmax(col, w)rolling_corr→mcorr(a, b, w)rolling_skew→mskew(col, w)rolling_kurt→mkurtosis(col, w)diff→deltas(col, n)lag→prev(col, n)
STATE: ema, cumsum, decay_accum, vpin_accum.
Numerically Stable rolling_std
DolphinDB's built-in mstd() suffers from catastrophic cancellation on large-magnitude inputs (e.g., BTC prices at $60,000 where x^2 terms overflow precision). The DDB lowering uses a two-pass stable formula instead:
sqrt(mavg((x - mavg(x, w))^2, w) * w/(w-1))
This computes the mean first, then variance of residuals, avoiding the large intermediate values that cause precision loss. It produces identical results to Polars' rolling_std(ddof=1).
Placeholder Operations
Several complex operations (vpin_accum, half_life, sample_entropy, adx, shannon_entropy, consecutive_count) are registered with placeholder stubs returning 0.0, pending future implementation. The Polars backend implements all of these fully using rolling_map with numpy.
Handler-Based Execution for Array Operations
DolphinDB Community Edition v3.0 cannot perform arithmetic on array vector element access inside createReactiveStateEngine metrics. For features requiring array operations (LOB depth imbalance, book pressure, queue position imbalance), the system deploys DolphinDB handler functions instead:
- A handler subscribes to the input table
- Parses the bids/asks JSON array column once per row via
fromStdJson - Computes the feature row-by-row in a loop
- Appends results to a shared output table via
append!
Consolidated LOB Handler
When multiple LOB features share the same input table, a single handler parses JSON once and computes all features in one loop:
Parse JSON (bids[], asks[]) — once per row
│
├── depth_imbalance = (sum(bids) - sum(asks)) / (sum(bids) + sum(asks) + ε)
├── book_pressure = Σ level_weight × level_imbalance
└── weighted_imbalance = Σ decay_weight × level_imbalance
The queue_position_imbalance feature requires an additional second-pass engine: a createReactiveStateEngine applies rolling_zscore to the intermediate weighted_imbalance column.
Code Generation
The code generator produces complete DolphinDB deployment scripts:
Engine creation: createReactiveStateEngine with metrics list, dummy table (defines schema), output table (shared stream table, 100k capacity), key column specification (symbol, venue).
Subscriptions: subscribeTable wiring the engine's input to the upstream stream table, with batchSize=1000 and throttle=1 for batching control.
Cleanup scripts: Every deployment begins with try/catch cleanup — unsubscribe, drop engine, undefine table — ensuring deployments are idempotent and can be re-run safely after crashes or configuration changes.
Deployment Idempotency
The generated script pattern for every engine:
try/catch: unsubscribe the handler if subscribedtry/catch: drop the engine if it existstry/catch: undefine the shared table if it exists- Create the shared stream table
- Create the engine
- Subscribe the engine to its input table
Deployments can be re-run after crashes, configuration changes, or partial failures without manual cleanup.