Skip to main content

Streaming Execution

The streaming path deploys the entire computation graph into a DolphinDB cluster. Once deployed, all data processing happens inside the DolphinDB server process — Python is used only for deployment and monitoring, never in the hot data path.

Introduction to DolphinDB — why DolphinDB, key concepts, how QuantFlow uses it


Architecture Principle

Every computation stage communicates exclusively through shared DolphinDB stream tables. An upstream stage writes to a stream table; a downstream stage subscribes to that table via DolphinDB's pub/sub. There is no runtime Python coupling between stages. This means:

  • Each stage can be deployed, monitored, and torn down independently.
  • The DolphinDB cluster handles all parallelism, memory management, and fault tolerance.
  • Python can disconnect after deployment; the pipeline continues running.

Connection Management

A DolphinDBConnection wrapper manages connectivity to the DolphinDB cluster:

  • Connection pooling with load balancing across cluster nodes, falling back to a single session.
  • Auto-retry with exponential backoff (configurable retries and base delay), with a variant for streaming operations where retry-induced reconnection would break subscriptions.
  • Credential resolution: explicit arguments take priority, then engine config auth, then defaults.
  • Helper methods for script execution, data upload, table insertion (thread-safe), and stream subscription management.
  • Monitoring: query getStreamEngineStat() and getStreamingStat() for health checks.

Schema Management

A table manager handles DolphinDB table lifecycle:

  • Maps QuantFlow canonical data types to DolphinDB types (STRING → STRING, DECIMAL → DOUBLE, TIMESTAMP → TIMESTAMP; ARRAY and JSON types stored as STRING since DolphinDB Community Edition has limited support for nested types).
  • Creates shared stream tables using enableTableShareAndPersistence with streamTable(), falling back to share streamTable() if the server lacks persistence.
  • Creates keyed tables and non-streaming shared tables for static data (e.g., symbol mappings).
  • Idempotent table creation — skips if the table already exists.

Pipeline Stages

The streaming pipeline is orchestrated in three sequential stages, managed by DolphinDBStreamPipeline:

Stage 1 — Ingest: Creates stream tables and starts ingestors that push data into them. Two ingestion paths exist:

  • Python path: Data arrives in Python (e.g., from exchange WebSocket clients) and is pushed via tableInsert into DolphinDB stream tables.
  • Kafka path: DolphinDB scripts call createKafkaConsumer and subscribeKafkaConsumer to pull directly from Kafka into stream tables, with no Python in the data path.

Stage 2 — Process: Maps raw data to CDM, then runs the state engine. Two sub-stages:

Mapping (DolphinDBMappingEngine): Translates landing table columns to CDM columns. For each data type, deploys a handler that applies column mappings, type conversions, optional LOB JSON parsing (L1/L2 aggregation), deduplication, and direct append to CDM tables.

State Engine (DolphinDBStreamStateEngine): Deploys DolphinDB engines for bar generation and trade enrichment:

  • LOB enrichment: Parses raw LOB JSON, computes L1+L2 metrics (best bid/ask, spread, mid, weighted mid, depth imbalance, VWAP) and writes to cdm_lob_l2.
  • Bar engines: 9 bar types deployed via builder classes — TimeBarBuilder, TickBarBuilder, VolumeBarBuilder, DollarBarBuilder, ImbalanceBarBuilder, RunBarBuilder, VolatilityBarBuilder, DollarImbalanceBarBuilder, CusumBarBuilder.
  • Trade enrichment: Asof join of each trade with the latest LOB snapshot to compute effective_spread, trade_direction, ret, log_return, order_book_depth, buy_volume, sell_volume.

Stage 3 — Feature: Receives topologically sorted FeatureInstance objects, builds IR DAGs, and deploys DolphinDB engines via DolphinDBStreamFeatureEngine.deploy(). Features are grouped by input table and deployed in consolidated engines to minimize engine count. Results flow into feature output stream tables.


Streaming Feature Engine

The DolphinDBStreamFeatureEngine.deploy() method is the core deployment algorithm:

Grouping strategy: Features sharing the same input table are deployed together to minimize engine count. Features are classified by their computation requirements:

  • Simple inline features (single group of steps, no array operations): deployed together as a single createReactiveStateEngine with multiple output columns — consolidated deployment.
  • LOB array features (require parsing bids/asks JSON arrays): deployed via handler functions since DolphinDB Community Edition cannot perform arithmetic on array vector element access inside engine metrics. If two or more LOB features share the same input table, they are further consolidated into a single handler that parses JSON once per row.
  • Multi-group features (split by EVENT ops or different window sizes): each group deployed as a separate engine.

Iterative resolution: Features are deployed in waves. The first wave deploys features whose input tables already exist (from the Process stage). Subsequent waves deploy features that depend on outputs from already-deployed features. This resolves the DAG dependencies correctly even when the topological sort spans multiple input tables.

Consolidated deployment: When N simple features share the same input table, instead of deploying N separate reactive state engines, a single engine is deployed with N output columns. Each column is named after its feature instance name to prevent collisions. This reduces total engine count by approximately 60% compared to one-engine-per-feature.


Expression Compiler (DolphinDB DSL)

The streaming module contains its own expression compiler that translates ComputationStep objects directly into DolphinDB metric expression strings:

Operation mappings:

  • Arithmetic: add/sub/mul/div → DolphinDB operators
  • Math: sqrtsqrt(), loglog(), absabs(), signsign()
  • Window: rolling_summsum(col, window), rolling_meanmavg(col, window), rolling_stdmstd(col, window), rolling_corrmcorr(a, b, window), rolling_minmmin(col, window)
  • State: decay_accumema(col * decay_factor, window), emaema(col, window), rolling_zscore(col - mavg(col, w)) / (mstd(col, w) + eps), sign_consistencyabs(mavg(sign(col), window))
  • Lag: lagprev(col, n), pct_changeratios(col, n) - 1.0
  • Array: array_extractarray_column[idx], weighted_sum → inline sum of element-wise products, array_wavgsum(terms) / totalWeight
  • Conditional: conditionaliif(condition, true_value, false_value) with nested iif for multi-branch logic

Expression folding: Intermediate expressions (columns produced by earlier steps in a feature's DAG) are inlined into consumer expressions via regex substitution. This means the final DolphinDB engine only sees the terminal output expressions — intermediate columns never materialize.

Consolidated metrics: When multiple features share one engine, their folded expressions are combined into a single metrics list with feature instance names as column aliases.


Code Generation

The code generator produces complete DolphinDB deployment scripts:

Reactive state engine generation: Produces the full createReactiveStateEngine script with metrics, dummy table (defines schema), output table, and key column specification. The output table is a shared stream table.

Subscription generation: Produces subscribeTable scripts wiring the engine's input to the upstream stream table, with configurable handler name, action name, and throttle settings.

Cleanup scripts: Every deployment begins with try/catch cleanup — unsubscribe, drop engine, undefine table — ensuring deployments are idempotent and can be re-run safely.


Handler-Based Execution

When array operations are required (e.g., LOB depth imbalance, book pressure), the expression compiler cannot use createReactiveStateEngine metrics because DolphinDB Community Edition v3.0 cannot perform arithmetic on array vector element access inside engine metric expressions. Instead, handler functions are generated:

Array handler pattern: A DolphinDB handler function subscribes to the input table, parses the bids/asks JSON array column once per row (via fromStdJson), computes the feature row-by-row, and appends results to a shared output table.

Consolidated LOB handler: When multiple LOB features (depth_imbalance, book_pressure, weighted_imbalance) share the same input table, a single handler parses JSON once and computes all three features in one loop, outputting to a multi-column table. The queue_position_imbalance feature requires an additional second-pass engine that applies rolling_zscore to the intermediate weighted_imbalance column.

Handler boilerplate: A common pattern wraps the row loop: allocate output arrays for the batch size, iterate over msg.size() rows, parse nested columns, execute the feature-specific loop body, and append the output arrays to the shared table.


Deployment Idempotency

Every engine deployment is idempotent. The generated script pattern is:

  1. try/catch: unsubscribe the handler if subscribed
  2. try/catch: drop the engine if it exists
  3. try/catch: undefine the shared table if it exists
  4. Create the shared stream table
  5. Create the engine
  6. Subscribe the engine to its input table

This means deployments can be re-run after crashes, configuration changes, or partial failures without manual cleanup.


Monitoring and Teardown

  • Health checks: Query DolphinDB's built-in getStreamEngineStat() and getStreamingStat() functions to report engine status, queue depths, and subscription counts.
  • Teardown: Engines are stopped in reverse dependency order (features → state → bridge → ingest) to prevent data loss from downstream engines reading from stopped upstream tables.

DolphinDBBackend — The Lowering Layer's Streaming Path

The lowering layer (feature_engine/lowering/engines/dolphindb/) contains a formal, IR-based compilation path for DolphinDB.

DolphinDBBackend: Implements the BaseBackend protocol by returning DolphinDB expression strings. col(name) returns the bare column name, add(a, b) returns "(a + b)", conditional_select(cond, t, f) returns "iif(cond, t, f)", array_wavg(array, weights) returns "rowWavg(array, [weights])". Each protocol method produces syntactically correct DolphinDB DSL fragments.

DolphinDB-specific lowering functions: Registered in expressions/ modules:

  • Window: diffdeltas(col, n), rolling_summsum(col, w), rolling_meanmavg(col, w), rolling_stdmstd(col, w), pct_changeratios(col, n) - 1.0, lagprev(col, n), rolling_corrmcorr(a, b, w), rolling_skewmskew(col, w), rolling_kurtmkurtosis(col, w)
  • State: decay_accum → custom ema-based formula, rolling_zscore(col - mavg(col,w)) / (mstd(col,w) + eps), sign_consistencyabs(mavg(sign(col), window)), neg_autocorr(-mcorr(col, prev(col,1), window)).nullFill(0)
  • Transform: array_extract, generate_weights, month_end_days, session_phase
  • Aggregation: sum(col), avg(col), std(col), skew(col), first(col), last(col)

Cross-backend consistency: The lowering layer enables the same (primitive, op) pair to be evaluated against both the PolarsBackend (batch) and the DolphinDBBackend (streaming), using a single @register-based dispatch via resolve_lowering(). Both backends consume the same IR DAG.

Streaming Feature Engine — modes, layer-based deployment, DDB lowering registry


Key Design Decisions

Consolidated engine deployment — Multiple features sharing the same input table are deployed as a single DolphinDB engine rather than one engine per feature. This reduces total engine count by ~60%, lowering cluster memory overhead and reducing inter-engine queue latency.

Everything inside DolphinDB — Once deployed, the entire streaming pipeline runs inside the DolphinDB server. Python is used only for deployment scripts. No Python code touches the hot data path. This avoids the GIL, serialization overhead, and network round-trips that would come with Python-in-the-loop.

Idempotent deployment — Every engine deployment script begins with cleanup (unsubscribe, drop, undef) inside try/catch blocks. Deployments can be re-run after crashes or configuration changes without manual intervention. This is essential for production operations where restarts must be safe and automated.

Handler fallback for array operations — DolphinDB Community Edition v3.0 has limited support for array vector arithmetic in engine metric expressions. Rather than requiring Enterprise Edition, the streaming path falls back to handler functions — DolphinDB scripts that iterate rows and compute array features in a loop. The consolidated handler pattern ensures JSON parsing happens once per row regardless of how many features consume the data.

Expression folding — Intermediate columns in a feature's step DAG are inlined into consumer expressions via regex substitution before engine deployment. Only terminal outputs become engine metrics. This reduces the number of columns materialized in stream tables and avoids the need for intermediate stream tables between steps of the same feature.