Skip to main content

Custom dbt Transformations

Extend the auto-generated dbt project with custom models, macros, QFSQL functions, and tests.


Overview

QuantFlow auto-generates a complete dbt project from metadata. You can extend it with custom models, macros, and tests without losing the auto-generation benefits. The generator preserves custom files and merges changes on re-generation.


Custom dbt Models

Adding Custom Staging Models

Place custom SQL models in dbt_project/models/custom/. They won't be overwritten by the generator:

dbt_project/
├── models/
│ ├── staging/ # Auto-generated (overwritten)
│ ├── cdm/ # Auto-generated (overwritten)
│ └── custom/ # Your models (preserved)
│ ├── custom_metrics.sql
│ └── schema.yml
-- models/custom/custom_metrics.sql
SELECT
symbol,
DATE_TRUNC('hour', event_time) AS hour,
AVG(price) AS avg_price,
SUM(size) AS total_volume,
COUNT(*) AS trade_count
FROM {{ ref('cdm_trades') }}
GROUP BY symbol, DATE_TRUNC('hour', event_time)

Overriding Auto-Generated Models

To override a staging model while keeping the auto-generated version as reference:

  1. Copy the auto-generated model from models/staging/ to models/custom/
  2. Modify the copy
  3. The custom version takes precedence if it has the same name and you configure model_paths in dbt_project.yml

Alternatively, use dbt's +enabled config to disable the auto-generated version and enable your custom one.


Custom dbt Macros

Adding Project-Specific Macros

Place custom macros in dbt_project/macros/custom/:

dbt_project/
└── macros/
├── quantflow/ # Auto-generated (overwritten)
└── custom/ # Your macros (preserved)
└── custom_helpers.sql
-- macros/custom/custom_helpers.sql
{% macro rolling_zscore(column, window) %}
({{ column }} - AVG({{ column }}) OVER (
ORDER BY event_time ROWS BETWEEN {{ window }} PRECEDING AND CURRENT ROW
)) / NULLIF(STDDEV({{ column }}) OVER (
ORDER BY event_time ROWS BETWEEN {{ window }} PRECEDING AND CURRENT ROW
), 0)
{% endmacro %}

Engine-Specific Macro Overrides

QuantFlow uses dbt's dispatch pattern. Override a built-in macro for a specific engine:

-- macros/custom/override_bigquery.sql
{% macro bigquery__cdm_incremental_config(target_cdm, partition_field, cluster_fields) %}
{{ config(
materialized='incremental',
unique_key='cdm_id',
incremental_strategy='insert_overwrite',
tags=['quantflow', 'cdm'],
on_schema_change='append_new_columns',
partition_by={
"field": partition_field,
"data_type": "timestamp",
"granularity": "day" -- Custom: daily instead of hourly
},
cluster_by=cluster_fields
) }}
{% endmacro %}

Custom QFSQL Functions

Adding a New QFSQL Function

QFSQL functions are defined per-engine in adapter classes. To add a custom function, subclass the adapter and add a new rule:

from quantflow.dbt.adapters.base import SQLEngineAdapter
from quantflow.dbt import DBTConfigAdapter

class CustomBigQueryAdapter(SQLEngineAdapter):
engine_name = "bigquery_custom"

@property
def qfsql_rules(self):
rules = super().qfsql_rules
# Add your custom function
rules["vwap"] = lambda args: (
f"SAFE_DIVIDE(SUM({args[0]} * {args[1]}), SUM({args[1]}))"
)
return rules

# Register with the DBTConfigAdapter
DBTConfigAdapter.register_engine_adapter("bigquery_custom", CustomBigQueryAdapter)

Then use it in field mappings:

field_mappings:
- target: vwap_price
transformation: "vwap(price, size)"

Function Signature Rules

Each QFSQL function receives a list of string arguments (already parsed from the function call). The lambda must return a valid SQL string for the target engine:

# Simple: one argument
"abs": lambda args: f"ABS({args[0]})"

# Multiple arguments
"str_concat": lambda args: f"CONCAT({', '.join(args)})"

# Optional arguments — check len(args)
"round": lambda args: f"ROUND({args[0]}, {args[1]})" if len(args) >= 2 else f"ROUND({args[0]})"

Custom dbt Tests

Project-Specific Tests

Add custom test SQL files in dbt_project/tests/custom/:

-- tests/custom/assert_positive_spread.sql
SELECT *
FROM {{ ref('cdm_lob_l2') }}
WHERE best_ask_price <= best_bid_price

Reference custom tests in your schema.yml:

tests:
table_tests:
- dbt_utils.expression_is_true:
expression: "best_ask_price > best_bid_price"

Custom Generic Tests

Define reusable test macros:

-- macros/custom/generic/test_positive.sql
{% test positive(model, column_name) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} <= 0
{% endtest %}

Use in column tests:

schema:
price:
dtype: decimal
tests:
- positive

dbt Generator Hooks

The dbt generator lifecycle exposes hooks for pre/post-generation customization:

Pre-Generation Hook

Run before the generator creates files — use for cleaning, validation, or custom setup:

from quantflow.dbt.generator import DBTProjectGenerator

class CustomGenerator(DBTProjectGenerator):
def pre_generate(self):
# Custom validation
assert len(self.metadata.project.symbols) > 0
# Custom setup
self.ensure_custom_directories()

Post-Generation Hook

Run after generation — use for adding custom configs, running linters, or triggering downstream steps:

class CustomGenerator(DBTProjectGenerator):
def post_generate(self):
# Add custom dbt_project.yml settings
self.update_dbt_project_config({
"models": {
"custom": {
"+materialized": "table"
}
}
})
# Run sqlfluff
import subprocess
subprocess.run(["sqlfluff", "fix", self.project_dir / "models"])

dbt Generator architectureQFSQL Reference