Migrating to dbt components
Dagster supports two ways to integrate with dbt: the dbt component (recommended) and the Pythonic integration library. If you built your Dagster and dbt project with the Pythonic integration, you can migrate to the dbt component and get the same result.
1. Scaffold the dbt component
The first step is to scaffold a dbt component definition. This will generate the defs.yaml
configuration file with a path to your dbt project:
type: dagster_dbt.DbtProjectComponent
attributes:
project: '{{ project_root }}/dbt'
2. Remove Pythonic definitions
Since the component handles the creation of any dbt assets in your Dagster project, as well as the configuration of the underlying resource, you can remove the explicit dbt resource creation code:
# These lines can be removed
from pathlib import Path
from dagster_dbt import DbtCliResource, DbtProject
import dagster as dg
dbt_project_directory = Path(__file__).absolute().parent / "dbt"
dbt_project = DbtProject(project_dir=dbt_project_directory)
dbt_resource = DbtCliResource(project_dir=dbt_project)
@dg.definitions
def resources():
return dg.Definitions(
resources={
"dbt": dbt_resource,
}
)
You can also remove any @dbt_assets
assets from your code:
# These lines can be removed
from dagster_dbt import DbtCliResource, dbt_assets
import dagster as dg
from .resources import dbt_project
@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
To ensure that the dbt assets have been replaced correctly, you can execute:
dg check defs
If there are still dbt assets defined via the Pythonic API, or the dbt resource is still present, you will receive a validation error due to duplication of definitions.
Assuming the check passes, you can also execute:
dg list defs
This will list all the assets in your project and allow you to see that the expected dbt assets are present.
3. Migrating translators (Optional)
If you had defined a custom DagsterDbtTranslator
for your dbt project, that logic can be moved into the defs.yaml
that was generated from scaffolding the component. For example, the custom translator:
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> dg.AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
return asset_key.with_prefix("my_prefix_")
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> str:
# Customize group names
return "my_dbt_group"
Can be applied to the defs.yaml
in the following way:
type: dagster_dbt.DbtProjectComponent
attributes:
project: '{{ project_root }}/dbt'
translation:
group_name: my_dbt_group
key: 'my_prefix_{{ node.name }}'
4. Migrating incremental models (Optional)
If you had incremental models defined in your dbt project, this logic can be moved into the defs.yaml
that was generated from scaffolding the component. For example, the partition:
INCREMENTAL_SELECTOR = "config.materialized:incremental"
daily_partition = dg.DailyPartitionsDefinition(start_date="2023-01-01")
Applied to @dbt_assets
:
@dbt_assets(
manifest=dbt_project.manifest_path,
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"start_date": time_window.start.strftime("%Y-%m-%d"),
"end_date": time_window.end.strftime("%Y-%m-%d"),
}
yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()
Can be applied to the components by doing the following. The first step is to add a new template var to your component. This will be used to define the partitions definition that will be used to partition the assets:
import dagster as dg
@dg.template_var
def daily_partitions_def() -> dg.DailyPartitionsDefinition:
return dg.DailyPartitionsDefinition(start_date="2023-01-01")
This will take the place of the dg.DailyPartitionsDefinition
definition.
Next, apply the partition from the new template vars to the defs.yaml
using the post_process
field. You will also need to include configurations to the cli_args
field so dbt can execute the using the partition:
type: dagster_dbt.DbtProjectComponent
template_vars_module: .template_vars
attributes:
project: '{{ project_root }}/dbt'
select: "customers"
translation:
group_name: dbt_models
description: "Transforms data using dbt model {{ node.name }}"
cli_args:
- build
- --vars:
start_date: "{{ context.partition_time_window.start.strftime('%Y-%m-%d') }}"
end_date: "{{ context.partition_time_window.end.strftime('%Y-%m-%d') }}"
post_processing:
assets:
- target: "*"
attributes:
partitions_def: "{{ daily_partitions_def }}"