Skip to content

prefect_dbt.cli.commands

Module containing tasks and flows for interacting with dbt CLI

DbtCoreOperation

Bases: ShellOperation

A block representing a dbt operation, containing multiple dbt and shell commands.

For long-lasting operations, use the trigger method and utilize the block as a context manager for automatic closure of processes when context is exited. If not, manually call the close method to close processes.

For short-lasting operations, use the run method. Context is automatically managed with this method.

Attributes:

Name Type Description
commands

A list of commands to execute sequentially.

stream_output

Whether to stream output.

env

A dictionary of environment variables to set for the shell operation.

working_dir

The working directory context the commands will be executed within.

shell

The shell to use to execute the commands.

extension

The extension to use for the temporary file. if unset defaults to .ps1 on Windows and .sh on other platforms.

profiles_dir Optional[Path]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the dbt commands provided. If this is not set, will try using the DBT_PROFILES_DIR environment variable, but if that's also not set, will use the default directory $HOME/.dbt/.

project_dir Optional[Path]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

Examples:

Load a configured block.

from prefect_dbt import DbtCoreOperation

dbt_op = DbtCoreOperation.load("BLOCK_NAME")

Execute short-lasting dbt debug and list with a custom DbtCliProfile.

from prefect_dbt import DbtCoreOperation, DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake import SnowflakeConnector

snowflake_connector = await SnowflakeConnector.load("snowflake-connector")
target_configs = SnowflakeTargetConfigs(connector=snowflake_connector)
dbt_cli_profile = DbtCliProfile(
    name="jaffle_shop",
    target="dev",
    target_configs=target_configs,
)
dbt_init = DbtCoreOperation(
    commands=["dbt debug", "dbt list"],
    dbt_cli_profile=dbt_cli_profile,
    overwrite_profiles=True
)
dbt_init.run()

Execute a longer-lasting dbt run as a context manager.

with DbtCoreOperation(commands=["dbt run"]) as dbt_run:
    dbt_process = dbt_run.trigger()
    # do other things
    dbt_process.wait_for_completion()
    dbt_output = dbt_process.fetch_result()

Source code in prefect_dbt/cli/commands.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class DbtCoreOperation(ShellOperation):
    """
    A block representing a dbt operation, containing multiple dbt and shell commands.

    For long-lasting operations, use the trigger method and utilize the block as a
    context manager for automatic closure of processes when context is exited.
    If not, manually call the close method to close processes.

    For short-lasting operations, use the run method. Context is automatically managed
    with this method.

    Attributes:
        commands: A list of commands to execute sequentially.
        stream_output: Whether to stream output.
        env: A dictionary of environment variables to set for the shell operation.
        working_dir: The working directory context the commands will be executed within.
        shell: The shell to use to execute the commands.
        extension: The extension to use for the temporary file.
            if unset defaults to `.ps1` on Windows and `.sh` on other platforms.
        profiles_dir: The directory to search for the profiles.yml file.
            Setting this appends the `--profiles-dir` option to the dbt commands
            provided. If this is not set, will try using the DBT_PROFILES_DIR
            environment variable, but if that's also not
            set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error if profiles.yml already
            exists under profile_dir and overwrite_profiles is set to False.

    Examples:
        Load a configured block.
        ```python
        from prefect_dbt import DbtCoreOperation

        dbt_op = DbtCoreOperation.load("BLOCK_NAME")
        ```

        Execute short-lasting dbt debug and list with a custom DbtCliProfile.
        ```python
        from prefect_dbt import DbtCoreOperation, DbtCliProfile
        from prefect_dbt.cli.configs import SnowflakeTargetConfigs
        from prefect_snowflake import SnowflakeConnector

        snowflake_connector = await SnowflakeConnector.load("snowflake-connector")
        target_configs = SnowflakeTargetConfigs(connector=snowflake_connector)
        dbt_cli_profile = DbtCliProfile(
            name="jaffle_shop",
            target="dev",
            target_configs=target_configs,
        )
        dbt_init = DbtCoreOperation(
            commands=["dbt debug", "dbt list"],
            dbt_cli_profile=dbt_cli_profile,
            overwrite_profiles=True
        )
        dbt_init.run()
        ```

        Execute a longer-lasting dbt run as a context manager.
        ```python
        with DbtCoreOperation(commands=["dbt run"]) as dbt_run:
            dbt_process = dbt_run.trigger()
            # do other things
            dbt_process.wait_for_completion()
            dbt_output = dbt_process.fetch_result()
        ```
    """

    _block_type_name = "dbt Core Operation"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250"  # noqa
    _documentation_url = "https://prefecthq.github.io/prefect-dbt/cli/commands/#prefect_dbt.cli.commands.DbtCoreOperation"  # noqa

    profiles_dir: Optional[Path] = Field(
        default=None,
        description=(
            "The directory to search for the profiles.yml file. "
            "Setting this appends the `--profiles-dir` option to the dbt commands "
            "provided. If this is not set, will try using the DBT_PROFILES_DIR "
            "environment variable, but if that's also not "
            "set, will use the default directory `$HOME/.dbt/`."
        ),
    )
    project_dir: Optional[Path] = Field(
        default=None,
        description=(
            "The directory to search for the dbt_project.yml file. "
            "Default is the current working directory and its parents."
        ),
    )
    overwrite_profiles: bool = Field(
        default=False,
        description=(
            "Whether the existing profiles.yml file under profiles_dir "
            "should be overwritten with a new profile."
        ),
    )
    dbt_cli_profile: Optional[DbtCliProfile] = Field(
        default=None,
        description=(
            "Profiles class containing the profile written to profiles.yml. "
            "Note! This is optional and will raise an error if profiles.yml already "
            "exists under profile_dir and overwrite_profiles is set to False."
        ),
    )

    @validator("commands", always=True)
    def _has_a_dbt_command(cls, commands):
        """
        Check that the commands contain a dbt command.
        """
        if not any("dbt " in command for command in commands):
            raise ValueError(
                "None of the commands are a valid dbt sub-command; see dbt --help, "
                "or use prefect_shell.ShellOperation for non-dbt related "
                "commands instead"
            )
        return commands

    def _find_valid_profiles_dir(self) -> PosixPath:
        """
        Ensure that there is a profiles.yml available for use.
        """
        profiles_dir = self.profiles_dir
        if profiles_dir is None:
            if self.env.get("DBT_PROFILES_DIR") is not None:
                # get DBT_PROFILES_DIR from the user input env
                profiles_dir = self.env["DBT_PROFILES_DIR"]
            else:
                # get DBT_PROFILES_DIR from the system env, or default to ~/.dbt
                profiles_dir = os.getenv("DBT_PROFILES_DIR", Path.home() / ".dbt")
        profiles_dir = relative_path_to_current_platform(
            Path(profiles_dir).expanduser()
        )

        # https://docs.getdbt.com/dbt-cli/configure-your-profile
        # Note that the file always needs to be called profiles.yml,
        # regardless of which directory it is in.
        profiles_path = profiles_dir / "profiles.yml"
        overwrite_profiles = self.overwrite_profiles
        dbt_cli_profile = self.dbt_cli_profile
        if not profiles_path.exists() or overwrite_profiles:
            if dbt_cli_profile is None:
                raise ValueError(
                    "Since overwrite_profiles is True or profiles_path is empty, "
                    "need `dbt_cli_profile` to write a profile"
                )
            profile = dbt_cli_profile.get_profile()
            profiles_dir.mkdir(exist_ok=True)
            with open(profiles_path, "w+") as f:
                yaml.dump(profile, f, default_flow_style=False)
        elif dbt_cli_profile is not None:
            raise ValueError(
                f"Since overwrite_profiles is False and profiles_path {profiles_path} "
                f"already exists, the profile within dbt_cli_profile couldn't be used; "
                f"if the existing profile is satisfactory, do not set dbt_cli_profile"
            )
        return profiles_dir

    def _append_dirs_to_commands(self, profiles_dir) -> List[str]:
        """
        Append profiles_dir and project_dir options to dbt commands.
        """
        project_dir = self.project_dir

        commands = []
        for command in self.commands:
            command += f" --profiles-dir {profiles_dir}"
            if project_dir is not None:
                project_dir = Path(project_dir).expanduser()
                command += f" --project-dir {project_dir}"
            commands.append(command)
        return commands

    def _compile_kwargs(self, **open_kwargs: Dict[str, Any]) -> Dict[str, Any]:
        """
        Helper method to compile the kwargs for `open_process` so it's not repeated
        across the run and trigger methods.
        """
        profiles_dir = self._find_valid_profiles_dir()
        commands = self._append_dirs_to_commands(profiles_dir=profiles_dir)

        # _compile_kwargs is called within trigger() and run(), prior to execution.
        # However _compile_kwargs directly uses self.commands, but here we modified
        # the commands without saving back to self.commands so we need to create a copy.
        # was also thinking of using env vars but DBT_PROJECT_DIR is not supported yet.
        modified_self = self.copy()
        modified_self.commands = commands
        return super(type(self), modified_self)._compile_kwargs(**open_kwargs)

create_summary_markdown

Creates a Prefect task artifact summarizing the results of the above predefined prefrect-dbt task.

Source code in prefect_dbt/cli/commands.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def create_summary_markdown(run_results: dict, command: str) -> str:
    """
    Creates a Prefect task artifact summarizing the results
    of the above predefined prefrect-dbt task.
    """
    markdown = f"# dbt {command} Task Summary\n"
    markdown += _create_node_summary_table_md(run_results=run_results)

    if (
        run_results["Error"] != []
        or run_results["Fail"] != []
        or run_results["Skipped"] != []
        or run_results["Warn"] != []
    ):
        markdown += "\n\n ## Unsuccessful Nodes ❌\n\n"
        markdown += _create_unsuccessful_markdown(run_results=run_results)

    if run_results["Success"] != []:
        successful_runs_str = "\n".join(
            [f"* {r.node.name}" for r in run_results["Success"]]
        )
        markdown += f"""\n## Successful Nodes ✅\n\n{successful_runs_str}\n\n"""

    return markdown

run_dbt_build async

Executes the 'dbt build' command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt build results.

Parameters:

Name Type Description Default
profiles_dir Optional[Union[Path, str]]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that's also not set, will use the default directory $HOME/.dbt/.

None
project_dir Optional[Union[Path, str]]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

None
overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

False
dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

None
create_artifact

If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to True.

required
artifact_key

The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-build-task-summary'.

required
extra_command_args Optional[List[str]]

Additional command arguments to pass to the dbt build command.

None
stream_output bool

Whether to stream output.

True
    from prefect import flow
    from prefect_dbt.cli.tasks import dbt_build_task

    @flow
    def dbt_test_flow():
        dbt_build_task(
            project_dir="/Users/test/my_dbt_project_dir",
            extra_command_args=["--model", "foo_model"]
        )

Raises:

Type Description
ValueError

If required dbt_cli_profile is not provided when needed for profile writing.

RuntimeError

If the dbt build fails for any reason, it will be indicated by the exception raised.

Source code in prefect_dbt/cli/commands.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
@task
async def run_dbt_build(
    profiles_dir: Optional[Union[Path, str]] = None,
    project_dir: Optional[Union[Path, str]] = None,
    overwrite_profiles: bool = False,
    dbt_cli_profile: Optional[DbtCliProfile] = None,
    create_summary_artifact: bool = False,
    summary_artifact_key: str = "dbt-build-task-summary",
    extra_command_args: Optional[List[str]] = None,
    stream_output: bool = True,
):
    """
    Executes the 'dbt build' command within a Prefect task,
    and optionally creates a Prefect artifact summarizing the dbt build results.

    Args:
        profiles_dir: The directory to search for the profiles.yml file. Setting this
            appends the `--profiles-dir` option to the command provided.
            If this is not set, will try using the DBT_PROFILES_DIR env variable,
            but if that's also not set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error
            if profiles.yml already exists under profile_dir
            and overwrite_profiles is set to False.
        create_artifact: If True, creates a Prefect artifact on the task run
            with the dbt build results using the specified artifact key.
            Defaults to True.
        artifact_key: The key under which to store
            the dbt build results artifact in Prefect.
            Defaults to 'dbt-build-task-summary'.
        extra_command_args: Additional command arguments to pass to the dbt build command.
        stream_output: Whether to stream output.

    Example:
    ```python
        from prefect import flow
        from prefect_dbt.cli.tasks import dbt_build_task

        @flow
        def dbt_test_flow():
            dbt_build_task(
                project_dir="/Users/test/my_dbt_project_dir",
                extra_command_args=["--model", "foo_model"]
            )
    ```

    Raises:
        ValueError: If required dbt_cli_profile is not provided
                    when needed for profile writing.
        RuntimeError: If the dbt build fails for any reason,
                    it will be indicated by the exception raised.
    """

    results = await trigger_dbt_cli_command.fn(
        command="build",
        profiles_dir=profiles_dir,
        project_dir=project_dir,
        overwrite_profiles=overwrite_profiles,
        dbt_cli_profile=dbt_cli_profile,
        create_summary_artifact=create_summary_artifact,
        summary_artifact_key=summary_artifact_key,
        extra_command_args=extra_command_args,
        stream_output=stream_output,
    )
    return results

run_dbt_model async

Executes the 'dbt run' command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt build results.

Parameters:

Name Type Description Default
profiles_dir Optional[Union[Path, str]]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that's also not set, will use the default directory $HOME/.dbt/.

None
project_dir Optional[Union[Path, str]]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

None
overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

False
dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

None
create_artifact

If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to True.

required
artifact_key

The key under which to store the dbt run results artifact in Prefect. Defaults to 'dbt-run-task-summary'.

required
extra_command_args Optional[List[str]]

Additional command arguments to pass to the dbt run command.

None
stream_output bool

Whether to stream output.

True
    from prefect import flow
    from prefect_dbt.cli.tasks import dbt_run_task

    @flow
    def dbt_test_flow():
        dbt_run_task(
            project_dir="/Users/test/my_dbt_project_dir",
            extra_command_args=["--model", "foo_model"]
        )

Raises:

Type Description
ValueError

If required dbt_cli_profile is not provided when needed for profile writing.

RuntimeError

If the dbt build fails for any reason, it will be indicated by the exception raised.

Source code in prefect_dbt/cli/commands.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
@task
async def run_dbt_model(
    profiles_dir: Optional[Union[Path, str]] = None,
    project_dir: Optional[Union[Path, str]] = None,
    overwrite_profiles: bool = False,
    dbt_cli_profile: Optional[DbtCliProfile] = None,
    create_summary_artifact: bool = False,
    summary_artifact_key: str = "dbt-run-task-summary",
    extra_command_args: Optional[List[str]] = None,
    stream_output: bool = True,
):
    """
    Executes the 'dbt run' command within a Prefect task,
    and optionally creates a Prefect artifact summarizing the dbt build results.

    Args:
        profiles_dir: The directory to search for the profiles.yml file. Setting this
            appends the `--profiles-dir` option to the command provided.
            If this is not set, will try using the DBT_PROFILES_DIR env variable,
            but if that's also not set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error
            if profiles.yml already exists under profile_dir
            and overwrite_profiles is set to False.
        create_artifact: If True, creates a Prefect artifact on the task run
            with the dbt build results using the specified artifact key.
            Defaults to True.
        artifact_key: The key under which to store
            the dbt run results artifact in Prefect.
            Defaults to 'dbt-run-task-summary'.
        extra_command_args: Additional command arguments to pass to the dbt run command.
        stream_output: Whether to stream output.

    Example:
    ```python
        from prefect import flow
        from prefect_dbt.cli.tasks import dbt_run_task

        @flow
        def dbt_test_flow():
            dbt_run_task(
                project_dir="/Users/test/my_dbt_project_dir",
                extra_command_args=["--model", "foo_model"]
            )
    ```

    Raises:
        ValueError: If required dbt_cli_profile is not provided
                    when needed for profile writing.
        RuntimeError: If the dbt build fails for any reason,
                    it will be indicated by the exception raised.
    """

    results = await trigger_dbt_cli_command.fn(
        command="run",
        profiles_dir=profiles_dir,
        project_dir=project_dir,
        overwrite_profiles=overwrite_profiles,
        dbt_cli_profile=dbt_cli_profile,
        create_summary_artifact=create_summary_artifact,
        summary_artifact_key=summary_artifact_key,
        extra_command_args=extra_command_args,
        stream_output=stream_output,
    )

    return results

run_dbt_seed async

Executes the 'dbt seed' command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt build results.

Parameters:

Name Type Description Default
profiles_dir Optional[Union[Path, str]]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that's also not set, will use the default directory $HOME/.dbt/.

None
project_dir Optional[Union[Path, str]]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

None
overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

False
dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

None
create_artifact

If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to True.

required
artifact_key

The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-seed-task-summary'.

required
extra_command_args Optional[List[str]]

Additional command arguments to pass to the dbt seed command.

None
stream_output bool

Whether to stream output.

True
    from prefect import flow
    from prefect_dbt.cli.tasks import dbt_seed_task

    @flow
    def dbt_test_flow():
        dbt_seed_task(
            project_dir="/Users/test/my_dbt_project_dir",
            extra_command_args=["--fail-fast"]
        )

Raises:

Type Description
ValueError

If required dbt_cli_profile is not provided when needed for profile writing.

RuntimeError

If the dbt build fails for any reason, it will be indicated by the exception raised.

Source code in prefect_dbt/cli/commands.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
@task
async def run_dbt_seed(
    profiles_dir: Optional[Union[Path, str]] = None,
    project_dir: Optional[Union[Path, str]] = None,
    overwrite_profiles: bool = False,
    dbt_cli_profile: Optional[DbtCliProfile] = None,
    create_summary_artifact: bool = False,
    summary_artifact_key: str = "dbt-seed-task-summary",
    extra_command_args: Optional[List[str]] = None,
    stream_output: bool = True,
):
    """
    Executes the 'dbt seed' command within a Prefect task,
    and optionally creates a Prefect artifact summarizing the dbt build results.

    Args:
        profiles_dir: The directory to search for the profiles.yml file. Setting this
            appends the `--profiles-dir` option to the command provided.
            If this is not set, will try using the DBT_PROFILES_DIR env variable,
            but if that's also not set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error
            if profiles.yml already exists under profile_dir
            and overwrite_profiles is set to False.
        create_artifact: If True, creates a Prefect artifact on the task run
            with the dbt build results using the specified artifact key.
            Defaults to True.
        artifact_key: The key under which to store
            the dbt build results artifact in Prefect.
            Defaults to 'dbt-seed-task-summary'.
        extra_command_args: Additional command arguments to pass to the dbt seed command.
        stream_output: Whether to stream output.

    Example:
    ```python
        from prefect import flow
        from prefect_dbt.cli.tasks import dbt_seed_task

        @flow
        def dbt_test_flow():
            dbt_seed_task(
                project_dir="/Users/test/my_dbt_project_dir",
                extra_command_args=["--fail-fast"]
            )
    ```

    Raises:
        ValueError: If required dbt_cli_profile is not provided
                    when needed for profile writing.
        RuntimeError: If the dbt build fails for any reason,
                    it will be indicated by the exception raised.
    """

    results = await trigger_dbt_cli_command.fn(
        command="seed",
        profiles_dir=profiles_dir,
        project_dir=project_dir,
        overwrite_profiles=overwrite_profiles,
        dbt_cli_profile=dbt_cli_profile,
        create_summary_artifact=create_summary_artifact,
        summary_artifact_key=summary_artifact_key,
        extra_command_args=extra_command_args,
        stream_output=stream_output,
    )

    return results

run_dbt_snapshot async

Executes the 'dbt snapshot' command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt build results.

Parameters:

Name Type Description Default
profiles_dir Optional[Union[Path, str]]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that's also not set, will use the default directory $HOME/.dbt/.

None
project_dir Optional[Union[Path, str]]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

None
overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

False
dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

None
create_artifact

If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to True.

required
artifact_key

The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-snapshot-task-summary'.

required
extra_command_args Optional[List[str]]

Additional command arguments to pass to the dbt snapshot command.

None
stream_output bool

Whether to stream output.

True
    from prefect import flow
    from prefect_dbt.cli.tasks import dbt_snapshot_task

    @flow
    def dbt_test_flow():
        dbt_snapshot_task(
            project_dir="/Users/test/my_dbt_project_dir",
            extra_command_args=["--fail-fast"]
        )

Raises:

Type Description
ValueError

If required dbt_cli_profile is not provided when needed for profile writing.

RuntimeError

If the dbt build fails for any reason, it will be indicated by the exception raised.

Source code in prefect_dbt/cli/commands.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
@task
async def run_dbt_snapshot(
    profiles_dir: Optional[Union[Path, str]] = None,
    project_dir: Optional[Union[Path, str]] = None,
    overwrite_profiles: bool = False,
    dbt_cli_profile: Optional[DbtCliProfile] = None,
    create_summary_artifact: bool = False,
    summary_artifact_key: str = "dbt-snapshot-task-summary",
    extra_command_args: Optional[List[str]] = None,
    stream_output: bool = True,
):
    """
    Executes the 'dbt snapshot' command within a Prefect task,
    and optionally creates a Prefect artifact summarizing the dbt build results.

    Args:
        profiles_dir: The directory to search for the profiles.yml file. Setting this
            appends the `--profiles-dir` option to the command provided.
            If this is not set, will try using the DBT_PROFILES_DIR env variable,
            but if that's also not set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error
            if profiles.yml already exists under profile_dir
            and overwrite_profiles is set to False.
        create_artifact: If True, creates a Prefect artifact on the task run
            with the dbt build results using the specified artifact key.
            Defaults to True.
        artifact_key: The key under which to store
            the dbt build results artifact in Prefect.
            Defaults to 'dbt-snapshot-task-summary'.
        extra_command_args: Additional command arguments to pass to the dbt snapshot command.
        stream_output: Whether to stream output.

    Example:
    ```python
        from prefect import flow
        from prefect_dbt.cli.tasks import dbt_snapshot_task

        @flow
        def dbt_test_flow():
            dbt_snapshot_task(
                project_dir="/Users/test/my_dbt_project_dir",
                extra_command_args=["--fail-fast"]
            )
    ```

    Raises:
        ValueError: If required dbt_cli_profile is not provided
                    when needed for profile writing.
        RuntimeError: If the dbt build fails for any reason,
                    it will be indicated by the exception raised.
    """

    results = await trigger_dbt_cli_command.fn(
        command="snapshot",
        profiles_dir=profiles_dir,
        project_dir=project_dir,
        overwrite_profiles=overwrite_profiles,
        dbt_cli_profile=dbt_cli_profile,
        create_summary_artifact=create_summary_artifact,
        summary_artifact_key=summary_artifact_key,
        extra_command_args=extra_command_args,
        stream_output=stream_output,
    )

    return results

run_dbt_test async

Executes the 'dbt test' command within a Prefect task, and optionally creates a Prefect artifact summarizing the dbt build results.

Parameters:

Name Type Description Default
profiles_dir Optional[Union[Path, str]]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR env variable, but if that's also not set, will use the default directory $HOME/.dbt/.

None
project_dir Optional[Union[Path, str]]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

None
overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

False
dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

None
create_artifact

If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to True.

required
artifact_key

The key under which to store the dbt test results artifact in Prefect. Defaults to 'dbt-test-task-summary'.

required
extra_command_args Optional[List[str]]

Additional command arguments to pass to the dbt test command.

None
stream_output bool

Whether to stream output.

True
    from prefect import flow
    from prefect_dbt.cli.tasks import dbt_test_task

    @flow
    def dbt_test_flow():
        dbt_test_task(
            project_dir="/Users/test/my_dbt_project_dir",
            extra_command_args=["--model", "foo_model"]
        )

Raises:

Type Description
ValueError

If required dbt_cli_profile is not provided when needed for profile writing.

RuntimeError

If the dbt build fails for any reason, it will be indicated by the exception raised.

Source code in prefect_dbt/cli/commands.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
@task
async def run_dbt_test(
    profiles_dir: Optional[Union[Path, str]] = None,
    project_dir: Optional[Union[Path, str]] = None,
    overwrite_profiles: bool = False,
    dbt_cli_profile: Optional[DbtCliProfile] = None,
    create_summary_artifact: bool = False,
    summary_artifact_key: str = "dbt-test-task-summary",
    extra_command_args: Optional[List[str]] = None,
    stream_output: bool = True,
):
    """
    Executes the 'dbt test' command within a Prefect task,
    and optionally creates a Prefect artifact summarizing the dbt build results.

    Args:
        profiles_dir: The directory to search for the profiles.yml file. Setting this
            appends the `--profiles-dir` option to the command provided.
            If this is not set, will try using the DBT_PROFILES_DIR env variable,
            but if that's also not set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error
            if profiles.yml already exists under profile_dir
            and overwrite_profiles is set to False.
        create_artifact: If True, creates a Prefect artifact on the task run
            with the dbt build results using the specified artifact key.
            Defaults to True.
        artifact_key: The key under which to store
            the dbt test results artifact in Prefect.
            Defaults to 'dbt-test-task-summary'.
        extra_command_args: Additional command arguments to pass to the dbt test command.
        stream_output: Whether to stream output.

    Example:
    ```python
        from prefect import flow
        from prefect_dbt.cli.tasks import dbt_test_task

        @flow
        def dbt_test_flow():
            dbt_test_task(
                project_dir="/Users/test/my_dbt_project_dir",
                extra_command_args=["--model", "foo_model"]
            )
    ```

    Raises:
        ValueError: If required dbt_cli_profile is not provided
                    when needed for profile writing.
        RuntimeError: If the dbt build fails for any reason,
                    it will be indicated by the exception raised.
    """

    results = await trigger_dbt_cli_command.fn(
        command="test",
        profiles_dir=profiles_dir,
        project_dir=project_dir,
        overwrite_profiles=overwrite_profiles,
        dbt_cli_profile=dbt_cli_profile,
        create_summary_artifact=create_summary_artifact,
        summary_artifact_key=summary_artifact_key,
        extra_command_args=extra_command_args,
        stream_output=stream_output,
    )

    return results

trigger_dbt_cli_command async

Task for running dbt commands.

If no profiles.yml file is found or if overwrite_profiles flag is set to True, this will first generate a profiles.yml file in the profiles_dir directory. Then run the dbt CLI shell command.

Parameters:

Name Type Description Default
command str

The dbt command to be executed.

required
profiles_dir Optional[Union[Path, str]]

The directory to search for the profiles.yml file. Setting this appends the --profiles-dir option to the command provided. If this is not set, will try using the DBT_PROFILES_DIR environment variable, but if that's also not set, will use the default directory $HOME/.dbt/.

None
project_dir Optional[Union[Path, str]]

The directory to search for the dbt_project.yml file. Default is the current working directory and its parents.

None
overwrite_profiles bool

Whether the existing profiles.yml file under profiles_dir should be overwritten with a new profile.

False
dbt_cli_profile Optional[DbtCliProfile]

Profiles class containing the profile written to profiles.yml. Note! This is optional and will raise an error if profiles.yml already exists under profile_dir and overwrite_profiles is set to False.

None
create_artifact

If True, creates a Prefect artifact on the task run with the dbt build results using the specified artifact key. Defaults to True.

required
artifact_key

The key under which to store the dbt build results artifact in Prefect. Defaults to 'dbt-seed-task-summary'.

required
extra_command_args Optional[List[str]]

Additional command arguments to pass to the dbt command. These arguments get appended to the command that gets passed to the dbtRunner client. Example: extra_command_args=["--model", "foo_model"]

None
stream_output bool

Whether to stream output.

True

Returns:

Name Type Description
last_line_cli_output str

The last line of the CLI output will be returned if return_all in shell_run_command_kwargs is False. This is the default behavior.

full_cli_output List[str]

Full CLI output will be returned if return_all in shell_run_command_kwargs is True.

Examples:

Execute dbt debug with a pre-populated profiles.yml.

from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command

@flow
def trigger_dbt_cli_command_flow():
    result = trigger_dbt_cli_command("dbt debug")
    return result

trigger_dbt_cli_command_flow()

Execute dbt debug without a pre-populated profiles.yml.

from prefect import flow
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials

@flow
def trigger_dbt_cli_command_flow():
    credentials = SnowflakeCredentials(
        user="user",
        password="password",
        account="account.region.aws",
        role="role",
    )
    connector = SnowflakeConnector(
        schema="public",
        database="database",
        warehouse="warehouse",
        credentials=credentials,
    )
    target_configs = SnowflakeTargetConfigs(
        connector=connector
    )
    dbt_cli_profile = DbtCliProfile(
        name="jaffle_shop",
        target="dev",
        target_configs=target_configs,
    )
    result = trigger_dbt_cli_command(
        "dbt run",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
        extra_command_args=["--model", "foo_model"]
    )
    return result

trigger_dbt_cli_command_flow()

Source code in prefect_dbt/cli/commands.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@task
async def trigger_dbt_cli_command(
    command: str,
    profiles_dir: Optional[Union[Path, str]] = None,
    project_dir: Optional[Union[Path, str]] = None,
    overwrite_profiles: bool = False,
    dbt_cli_profile: Optional[DbtCliProfile] = None,
    create_summary_artifact: bool = False,
    summary_artifact_key: Optional[str] = "dbt-cli-command-summary",
    extra_command_args: Optional[List[str]] = None,
    stream_output: bool = True,
) -> Optional[dbtRunnerResult]:
    """
    Task for running dbt commands.

    If no profiles.yml file is found or if overwrite_profiles flag is set to True, this
    will first generate a profiles.yml file in the profiles_dir directory. Then run the dbt
    CLI shell command.

    Args:
        command: The dbt command to be executed.
        profiles_dir: The directory to search for the profiles.yml file. Setting this
            appends the `--profiles-dir` option to the command provided. If this is not set,
            will try using the DBT_PROFILES_DIR environment variable, but if that's also not
            set, will use the default directory `$HOME/.dbt/`.
        project_dir: The directory to search for the dbt_project.yml file.
            Default is the current working directory and its parents.
        overwrite_profiles: Whether the existing profiles.yml file under profiles_dir
            should be overwritten with a new profile.
        dbt_cli_profile: Profiles class containing the profile written to profiles.yml.
            Note! This is optional and will raise an error if profiles.yml already exists
            under profile_dir and overwrite_profiles is set to False.
        create_artifact: If True, creates a Prefect artifact on the task run
            with the dbt build results using the specified artifact key.
            Defaults to True.
        artifact_key: The key under which to store
            the dbt build results artifact in Prefect.
            Defaults to 'dbt-seed-task-summary'.
        extra_command_args: Additional command arguments to pass to the dbt command.
            These arguments get appended to the command that gets passed to the dbtRunner client.
            Example: extra_command_args=["--model", "foo_model"]
        stream_output: Whether to stream output.

    Returns:
        last_line_cli_output (str): The last line of the CLI output will be returned
            if `return_all` in `shell_run_command_kwargs` is False. This is the default
            behavior.
        full_cli_output (List[str]): Full CLI output will be returned if `return_all`
            in `shell_run_command_kwargs` is True.

    Examples:
        Execute `dbt debug` with a pre-populated profiles.yml.
        ```python
        from prefect import flow
        from prefect_dbt.cli.commands import trigger_dbt_cli_command

        @flow
        def trigger_dbt_cli_command_flow():
            result = trigger_dbt_cli_command("dbt debug")
            return result

        trigger_dbt_cli_command_flow()
        ```

        Execute `dbt debug` without a pre-populated profiles.yml.
        ```python
        from prefect import flow
        from prefect_dbt.cli.credentials import DbtCliProfile
        from prefect_dbt.cli.commands import trigger_dbt_cli_command
        from prefect_dbt.cli.configs import SnowflakeTargetConfigs
        from prefect_snowflake.credentials import SnowflakeCredentials

        @flow
        def trigger_dbt_cli_command_flow():
            credentials = SnowflakeCredentials(
                user="user",
                password="password",
                account="account.region.aws",
                role="role",
            )
            connector = SnowflakeConnector(
                schema="public",
                database="database",
                warehouse="warehouse",
                credentials=credentials,
            )
            target_configs = SnowflakeTargetConfigs(
                connector=connector
            )
            dbt_cli_profile = DbtCliProfile(
                name="jaffle_shop",
                target="dev",
                target_configs=target_configs,
            )
            result = trigger_dbt_cli_command(
                "dbt run",
                overwrite_profiles=True,
                dbt_cli_profile=dbt_cli_profile,
                extra_command_args=["--model", "foo_model"]
            )
            return result

        trigger_dbt_cli_command_flow()
        ```
    """
    logger = get_run_logger()

    if profiles_dir is None:
        profiles_dir = os.getenv("DBT_PROFILES_DIR", str(Path.home()) + "/.dbt")

    if command.startswith("dbt"):
        command = command.split(" ", 1)[1]

    # https://docs.getdbt.com/dbt-cli/configure-your-profile
    # Note that the file always needs to be called profiles.yml,
    # regardless of which directory it is in.
    profiles_path = profiles_dir + "/profiles.yml"
    logger.debug(f"Using this profiles path: {profiles_path}")

    # write the profile if overwrite or no profiles exist
    if overwrite_profiles or not Path(profiles_path).expanduser().exists():
        if dbt_cli_profile is None:
            raise ValueError("Provide `dbt_cli_profile` keyword for writing profiles")
        profile = dbt_cli_profile.get_profile()
        Path(profiles_dir).expanduser().mkdir(exist_ok=True)
        with open(profiles_path, "w+") as f:
            yaml.dump(profile, f, default_flow_style=False)
        logger.info(f"Wrote profile to {profiles_path}")
    elif dbt_cli_profile is not None:
        raise ValueError(
            f"Since overwrite_profiles is False and profiles_path ({profiles_path}) "
            f"already exists, the profile within dbt_cli_profile could not be used; "
            f"if the existing profile is satisfactory, do not pass dbt_cli_profile"
        )

    # append the options
    cli_args = command.split(" ")
    cli_args.append("--profiles-dir")
    cli_args.append(profiles_dir)
    if project_dir is not None:
        project_dir = Path(project_dir).expanduser()
        cli_args.append("--project-dir")
        cli_args.append(project_dir)

    if extra_command_args:
        for value in extra_command_args:
            cli_args.append(value)

    # Add the dbt event log callback if enabled
    callbacks = []
    if stream_output:

        def _stream_output(event):
            if event.info.level != "debug":
                logger.info(event.info.msg)

        callbacks.append(_stream_output)

    # fix up empty shell_run_command_kwargs
    dbt_runner_client = dbtRunner(callbacks=callbacks)
    logger.info(f"Running dbt command: {cli_args}")
    result: dbtRunnerResult = dbt_runner_client.invoke(cli_args)

    if result.exception is not None:
        logger.error(f"dbt task failed with exception: {result.exception}")
        raise result.exception

    # Creating the dbt Summary Markdown if enabled
    if create_summary_artifact and isinstance(result.result, ExecutionResult):
        run_results = consolidate_run_results(result)
        markdown = create_summary_markdown(run_results, command)
        artifact_id = await create_markdown_artifact(
            markdown=markdown,
            key=summary_artifact_key,
        )
        if not artifact_id:
            logger.error(f"Summary Artifact was not created for dbt {command} task")
        else:
            logger.info(
                f"dbt {command} task completed successfully with artifact {artifact_id}"
            )
    else:
        logger.debug(
            f"Artifacts were not created for dbt {command} this task \
                     due to create_artifact=False or the dbt command did not \
                     return any ExecutionResult. \
                     See https://docs.getdbt.com/reference/programmatic-invocations \
                     for more details on dbtRunnerResult."
        )
    if isinstance(result.result, ExecutionResult) and not result.success:
        return Failed(
            message=f"dbt task result unsuccessful with exception: {result.exception}"
        )
    return result