Building a Config-Driven DLT Framework — Part 1: The SQL + YAML Pattern

Every DLT pipeline I’ve seen at work follows the same pattern: a Python file that registers tables using decorators, hard-coded SQL inside Python strings, and a databricks.yml that needs updating every time you add a table. It works, but it doesn’t scale — adding a new dimension table means touching three files and hoping you don’t break the import order.

I wanted something simpler: drop a .sql and .yaml file into a folder, and the pipeline picks it up automatically.

This is Part 1 of a 4-part series on building a config-driven DLT framework with AI-powered monitoring. In this post, I’ll cover the core architecture and the runtime discovery pattern.

The Problem with Standard DLT Pipelines

A typical DLT pipeline looks like this:

import dlt

@dlt.table
def dim_customer():
    return spark.sql("""
        SELECT customer_id, name, email
        FROM silver.customers
    """)

@dlt.table
def dim_orders():
    return spark.sql("""
        SELECT order_id, customer_id, amount
        FROM silver.orders
    """)

This works fine for 2-3 tables. But when you’re building a gold layer with 20+ dimensions, each with SCD tracking and data quality expectations, you end up with a massive Python file that’s hard to review, hard to test, and easy to break.

The other issue: SQL lives inside Python strings, so you lose syntax highlighting, linting, and the ability to run queries independently.

The SQL + YAML Convention

My framework uses a file-pair convention. For each DLT table, you create two files with the same stem:

pipelines/
  customers.sql    # The SELECT query
  customers.yaml   # Metadata: table type, keys, expectations
  orders.sql
  orders.yaml

The SQL file is pure SQL — just the SELECT statement:

SELECT
    customer_id,
    name,
    email,
    created_at
FROM catalog.silver.customers

The YAML file declares the table type, primary keys (for SCD tables), and data quality expectations:

table_type: scd2
primary_key:
  - customer_id
sequence_by: created_at
expectations:
  - name: valid_email
    rule: "email IS NOT NULL"
    on_violation: warn

That’s it. No Python. No decorators. No databricks.yml changes.

The Pipeline Factory

The core of the framework is pipeline_factory.py — a single file deployed to Databricks that discovers and registers all pipeline definitions at runtime.

The discovery loop runs at DLT pipeline startup:

for _yaml_file in sorted(PIPELINES_DIR.glob("*.yaml")):
    _sql_file = _yaml_file.with_suffix(".sql")
    if not _sql_file.exists():
        continue

    with open(_yaml_file, encoding="utf-8") as _f:
        _defn = yaml.safe_load(_f)

    _sql_text = _sql_file.read_text(encoding="utf-8").strip()
    _name = _yaml_file.stem

    if _defn.get("table_type") == "mv":
        _register_mv(_name, _sql_text, _defn.get("expectations", []))
    else:
        _register_scd(_name, _sql_text, _defn)

It globs all YAML files, finds matching SQL files, and routes to the appropriate registration function based on table_type.

Handling Table Types

The framework supports three table types:

Materialized Views (table_type: mv) — simple @dp.materialized_view registration. Good for aggregations or denormalized views that don’t need change tracking.

SCD Type 1 (table_type: scd1) — overwrites existing rows on key match using dp.create_auto_cdc_flow(stored_as_scd_type=1).

SCD Type 2 (table_type: scd2) — maintains history with __START_AT / __END_AT columns via stored_as_scd_type=2. Requires a sequence_by column.

For SCD tables, the factory creates a temporary view from the SQL, then wires it into a CDC flow:

def _register_scd(table_name, sql, definition):
    view_name = f"_src_{table_name}"
    scd_type = 1 if definition["table_type"] == "scd1" else 2

    @dp.temporary_view(name=view_name)
    def _source():
        return spark.sql(sql)

    dp.create_streaming_table(table_name)
    dp.create_auto_cdc_flow(
        target=table_name,
        source=view_name,
        keys=definition["primary_key"],
        sequence_by=definition.get("sequence_by"),
        stored_as_scd_type=scd_type,
    )

The Closure Trap

One subtle thing: factory functions are essential here. If you register tables inside a loop using closures, Python captures the loop variable by reference — every table ends up using the last iteration’s SQL. The factory pattern freezes table_name and sql per-iteration by passing them as function arguments.

DBR Version Compatibility

Databricks recently moved the DLT API from import dlt to from pyspark import pipelines. The factory handles this with an import shim:

try:
    from pyspark import pipelines as dp
except ImportError:
    import dlt as dp

This keeps the framework working across DBR 14.3+ and older runtimes.

What’s Next

In Part 2, I’ll cover the validation layer — Pydantic models that catch misconfigurations before deployment, SQL linting with SQLFluff, and the data quality expectations system.