Skip to main content

Migrating to dbt components

Dagster supports two ways to integrate with dbt: the dbt component (recommended) and the Pythonic integration library. If you built your Dagster and dbt project with the Pythonic integration, you can migrate to the dbt component and get the same result.

1. Scaffold the dbt component

The first step is to scaffold a dbt component definition. This will generate the defs.yaml configuration file with a path to your dbt project:

my_project/defs/dbt_ingest/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'

2. Remove Pythonic definitions

Since the component handles the creation of any dbt assets in your Dagster project, as well as the configuration of the underlying resource, you can remove the explicit dbt resource creation code:

my_project/defs/resources.py
# These lines can be removed
from pathlib import Path

from dagster_dbt import DbtCliResource, DbtProject

import dagster as dg

dbt_project_directory = Path(__file__).absolute().parent / "dbt"
dbt_project = DbtProject(project_dir=dbt_project_directory)

dbt_resource = DbtCliResource(project_dir=dbt_project)


@dg.definitions
def resources():
return dg.Definitions(
resources={
"dbt": dbt_resource,
}
)


You can also remove any @dbt_assets assets from your code:

my_project/defs/assets.py
# These lines can be removed
from dagster_dbt import DbtCliResource, dbt_assets

import dagster as dg

from .resources import dbt_project


@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


To ensure that the dbt assets have been replaced correctly, you can execute:

dg check defs

If there are still dbt assets defined via the Pythonic API, or the dbt resource is still present, you will receive a validation error due to duplication of definitions.

Assuming the check passes, you can also execute:

dg list defs

This will list all the assets in your project and allow you to see that the expected dbt assets are present.

3. Migrating translators (Optional)

If you had defined a custom DagsterDbtTranslator for your dbt project, that logic can be moved into the defs.yaml that was generated from scaffolding the component. For example, the custom translator:

my_project/defs/assets.py
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> dg.AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
return asset_key.with_prefix("my_prefix_")

def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> str:
# Customize group names
return "my_dbt_group"

Can be applied to the defs.yaml in the following way:

my_project/defs/dbt_ingest/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
translation:
group_name: my_dbt_group
key: 'my_prefix_{{ node.name }}'

4. Migrating incremental models (Optional)

If you had incremental models defined in your dbt project, this logic can be moved into the defs.yaml that was generated from scaffolding the component. For example, the partition:

my_project/defs/assets.py
INCREMENTAL_SELECTOR = "config.materialized:incremental"

daily_partition = dg.DailyPartitionsDefinition(start_date="2023-01-01")

Applied to @dbt_assets:

my_project/defs/assets.py
@dbt_assets(
manifest=dbt_project.manifest_path,
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"start_date": time_window.start.strftime("%Y-%m-%d"),
"end_date": time_window.end.strftime("%Y-%m-%d"),
}

yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()

Can be applied to the components by doing the following. The first step is to add a new template var to your component. This will be used to define the partitions definition that will be used to partition the assets:

my_project/defs/dbt_ingest/template_vars.py
import dagster as dg


@dg.template_var
def daily_partitions_def() -> dg.DailyPartitionsDefinition:
return dg.DailyPartitionsDefinition(start_date="2023-01-01")

This will take the place of the dg.DailyPartitionsDefinition definition.

Next, apply the partition from the new template vars to the defs.yaml using the post_process field. You will also need to include configurations to the cli_args field so dbt can execute the using the partition:

my_project/defs/dbt_ingest/defs.yaml
type: dagster_dbt.DbtProjectComponent

template_vars_module: .template_vars
attributes:
project: '{{ project_root }}/dbt'
select: "customers"
translation:
group_name: dbt_models
description: "Transforms data using dbt model {{ node.name }}"
cli_args:
- build
- --vars:
start_date: "{{ context.partition_time_window.start.strftime('%Y-%m-%d') }}"
end_date: "{{ context.partition_time_window.end.strftime('%Y-%m-%d') }}"
post_processing:
assets:
- target: "*"
attributes:
partitions_def: "{{ daily_partitions_def }}"