Python API
Programmatic usage for custom workflows beyond the CLI.
1. Loading a Project
from quantflow.metadata import load_metadata
meta = load_metadata(project_dir="/path/to/project")
project = meta.project
load_metadata() loads quantflow_project.yml, .local_config.yml, and all definitions (feed providers, feature types, CDM entities) into a MetadataManager object. Access project config via meta.project.
2. IO Engines
from quantflow.io import DBRegistry
# Create an engine from local config
engine = DBRegistry.create("duckdb", {
"database": "path/to/db.duckdb",
})
# Read CDM data
arrow_table = engine.read_arrow(
"cdm.cdm_trade_enriched",
)
# Execute SQL
result = engine.read_sql("SELECT count(*) FROM cdm.cdm_trade_enriched")
# Write data
engine.write_arrow("my_table", arrow_table)
7 engines are registered: duckdb, openlakehouse, bigquery, snowflake, databricks. The aliases s3_iceberg and trino also resolve to openlakehouse for backward compatibility.
To extract engine config from metadata:
from quantflow.io import engine_config_from_metadata
config = engine_config_from_metadata(meta, "duckdb")
engine = DBRegistry.create("duckdb", config)
3. Feature Engine
from quantflow.feature_engine import FeatureEngine
engine = FeatureEngine(metadata=meta)
# Compute a specific feature set
table, feature_names = engine.run(
symbols=["BTCUSDT"],
pack="microstructure_breakout",
)
# Compute and write to database
engine.run_and_write(
symbols=["BTCUSDT"],
pack="microstructure_breakout",
)
4. Label Engine
from quantflow.label_engine import LabelEngine
label_engine = LabelEngine(
config=meta.project.label_engine,
engine=engine,
schema_name="cdm",
)
# Compute labels for a date range
results = label_engine.run(
symbols=["BTCUSDT"],
start="2026-01-01",
end="2026-01-15",
)
To join features and labels for ML training, query both tables from DuckDB and join on symbol and timestamp. Features are stored in {project}_feature.features (or per-pack tables for bar-mode features); labels are in cdm.cdm_labels.
5. State Engine
from quantflow.state_engine import StateEngine
from quantflow.state_engine.batcher import SourceBatcher
# Create engine from project metadata
engine = StateEngine(metadata=meta)
# With real data from a database source
batcher = SourceBatcher(
engine=db_engine,
symbol="BTCUSDT",
venue="binance_spot",
start_time="2026-01-01",
end_time="2026-01-31",
source_trades_table="demo_2_cdm.cdm_trades",
source_lob_table="demo_2_cdm.cdm_lob_incremental",
engine_type="duckdb",
)
for batch in batcher:
result = engine.process(batch)
# result contains DataFrames: quotes, trades_enriched, snapshots, bars
For testing with synthetic data:
from quantflow.state_engine.batcher import MockBatcher
batcher = MockBatcher(num_batches=10, batch_size=100000)
for batch in batcher:
result = engine.process(batch)
6. Pipeline Runner
from quantflow.pipeline import create_runner, PipelineMode
# Research mode (batch)
runner = create_runner(meta, PipelineMode.RESEARCH)
result = runner.run(
stage="all", # or ingest, dbt, state_engine, label_engine, feature_engine
symbols=["BTCUSDT"],
start_date="2026-01-01",
end_date="2026-01-31",
)
# Trade mode (streaming)
runner = create_runner(meta, PipelineMode.TRADE)
result = runner.run(stage="all", symbols=["BTCUSDT"])
7. Streaming Deployment
Streaming pipelines are managed through the pipeline runner:
from quantflow.pipeline import create_runner, PipelineMode
runner = create_runner(meta, PipelineMode.TRADE)
# Deploy end-to-end
runner.run(stage="all")
# Or individual stages
runner.run(stage="ingest")
runner.run(stage="process")
runner.run(stage="feature")
Monitor via CLI: qf pipeline status
8. Data Ingestion
from quantflow.ingest import IngestPipeline
pipeline = IngestPipeline(project_dir="/path/to/project")
result = pipeline.run(
start_date="2026-01-01",
end_date="2026-01-31",
symbols=["BTCUSDT"],
force_download=False,
write_mode="append",
)