Dagster & dbt (Pythonic)
If you are just getting started with the dbt integration, we recommend using the new dbt component.
Dagster orchestrates dbt alongside other technologies, so you can schedule dbt with Spark, Python, etc. in a single data pipeline. Dagster's asset-oriented approach allows Dagster to understand dbt at the level of individual dbt models.
1. Prepare a Dagster project
To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:
create-dagster project my-project && cd my-project
Activate the project virtual environment:
source .venv/bin/activate
Then, add the dagster-dbt
library to the project, along with a duckdb adapter:
- uv
- pip
uv add dagster-dbt dbt-duckdb
pip install dagster-dbt dbt-duckdb
2. Set up a dbt project
For this tutorial, we'll use the jaffle shop dbt project as an example. Clone it into your project:
git clone --depth=1 https://github.com/dbt-labs/jaffle_shop.git dbt && rm -rf dbt/.git
We will create a profiles.yml
file in the dbt
directory to configure the project to use DuckDB:
jaffle_shop:
target: dev
outputs:
dev:
type: duckdb
path: tutorial.duckdb
threads: 24
3. Initialize the dbt assets
First create a dbt resource. This will point to the dbt project directory within the Dagster project directory:
from pathlib import Path
from dagster_dbt import DbtCliResource, DbtProject
import dagster as dg
dbt_project_directory = Path(__file__).absolute().parent / "dbt"
dbt_project = DbtProject(project_dir=dbt_project_directory)
dbt_resource = DbtCliResource(project_dir=dbt_project)
@dg.definitions
def resources():
return dg.Definitions(
resources={
"dbt": dbt_resource,
}
)
With the dbt resource defined, you can use the dbt project to generate the dbt assets:
from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg
from .resources import dbt_project
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
4. Run your dbt models
To execute your dbt models, you can use the dg launch
command to kick off a run through the CLI:
dg launch --assets '*'
5. Customize dbt assets
You can customize the properties of the assets emitted by each dbt model using a DagsterDbtTranslator
. This allows you to modify asset metadata such as group names, descriptions, and other properties. To create a custom translator, create a new class that inherits from DagsterDbtTranslator
. You can then write custom logic for metadata attributes of the dbt assets by overriding the get_asset_key
and get_group_name
methods. The translator is then applied to the @dbt_assets
assets:
from collections.abc import Mapping
from typing import Any
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
import dagster as dg
from .resources import dbt_project
# start_custom_dagster_dbt_translator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> dg.AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
return asset_key.with_prefix("my_prefix_")
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> str:
# Customize group names
return "my_dbt_group"
# end_custom_dagster_dbt_translator
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
6. Handling incremental models
If you have incremental models in your dbt project, you can model these as partitioned assets, and update the command that is used to run the dbt models to pass in --vars
based on the range of partitions that are being processed.
To enable partitioning for incremental dbt models in Dagster, we start by creating a new @dbt_assets
definition. First, we add a selector (INCREMENTAL_SELECTOR
) to identify incremental models and configure them with a daily_partition
. This allows Dagster to determine which partition is running and pass the correct time window (min_date, max_date) into dbt using the vars argument. With this setup, dbt builds only the records within the active partition.
Next, we separate our dbt executions: one @dbt_assets
function (dbt_analytics
) excludes incremental models, while another (incremental_dbt_models
) handles only those incremental assets with partitions. Inside incremental_dbt_models
, we fetch the partition time window from the context, format it as variables, and pass them into dbt’s CLI command. This ensures dbt materializes only the partition-specific slice of data while keeping dependencies clear and maintainable.
import json
from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg
from .resources import dbt_project
# start_incremental_partition
INCREMENTAL_SELECTOR = "config.materialized:incremental"
daily_partition = dg.DailyPartitionsDefinition(start_date="2023-01-01")
# end_incremental_partition
@dbt_assets(
manifest=dbt_project.manifest_path,
exclude=INCREMENTAL_SELECTOR,
)
def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):
dbt_build_invocation = dbt.cli(["build"], context=context)
yield from dbt_build_invocation.stream()
run_results_json = dbt_build_invocation.get_artifact("run_results.json")
for result in run_results_json["results"]:
context.log.debug(result["compiled_code"])
# start_incremental_dbt_models
@dbt_assets(
manifest=dbt_project.manifest_path,
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"start_date": time_window.start.strftime("%Y-%m-%d"),
"end_date": time_window.end.strftime("%Y-%m-%d"),
}
yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()
# end_incremental_dbt_models