Skip to content

prefect_dbt.cloud.clients

Module containing clients for interacting with the dbt Cloud API

DbtCloudAdministrativeClient

Client for interacting with the dbt cloud Administrative API.

Parameters:

Name Type Description Default
api_key str

API key to authenticate with the dbt Cloud administrative API.

required
account_id int

ID of dbt Cloud account with which to interact.

required
domain str

Domain at which the dbt Cloud API is hosted.

'cloud.getdbt.com'
Source code in prefect_dbt/cloud/clients.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class DbtCloudAdministrativeClient:
    """
    Client for interacting with the dbt cloud Administrative API.

    Args:
        api_key: API key to authenticate with the dbt Cloud administrative API.
        account_id: ID of dbt Cloud account with which to interact.
        domain: Domain at which the dbt Cloud API is hosted.
    """

    def __init__(self, api_key: str, account_id: int, domain: str = "cloud.getdbt.com"):
        self._closed = False
        self._started = False

        self._admin_client = AsyncClient(
            headers={
                "Authorization": f"Bearer {api_key}",
                "user-agent": f"prefect-{prefect.__version__}",
                "x-dbt-partner-source": "prefect",
            },
            base_url=f"https://{domain}/api/v2/accounts/{account_id}",
        )

    async def call_endpoint(
        self,
        http_method: str,
        path: str,
        params: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,
    ) -> Response:
        """
        Call an endpoint in the dbt Cloud API.

        Args:
            path: The partial path for the request (e.g. /projects/). Will be appended
                onto the base URL as determined by the client configuration.
            http_method: HTTP method to call on the endpoint.
            params: Query parameters to include in the request.
            json: JSON serializable body to send in the request.

        Returns:
            The response from the dbt Cloud administrative API.
        """
        response = await self._admin_client.request(
            method=http_method, url=path, params=params, json=json
        )

        response.raise_for_status()

        return response

    async def get_job(
        self,
        job_id: int,
        order_by: Optional[str] = None,
    ) -> Response:
        """
        Return job details for a job on an account.

        Args:
            job_id: Numeric ID of the job.
            order_by: Field to order the result by. Use - to indicate reverse order.

        Returns:
            The response from the dbt Cloud administrative API.
        """  # noqa
        params = {"order_by": order_by} if order_by else None
        return await self.call_endpoint(
            path=f"/jobs/{job_id}/", http_method="GET", params=params
        )

    async def trigger_job_run(
        self, job_id: int, options: Optional[TriggerJobRunOptions] = None
    ) -> Response:
        """
        Sends a request to the [trigger job run endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Jobs/operation/triggerRun)
        to initiate a job run.

        Args:
            job_id: The ID of the job to trigger.
            options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run.

        Returns:
            The response from the dbt Cloud administrative API.
        """  # noqa
        if options is None:
            options = TriggerJobRunOptions()

        return await self.call_endpoint(
            path=f"/jobs/{job_id}/run/",
            http_method="POST",
            json=options.dict(exclude_none=True),
        )

    async def get_run(
        self,
        run_id: int,
        include_related: Optional[
            List[Literal["trigger", "job", "debug_logs", "run_steps"]]
        ] = None,
    ) -> Response:
        """
        Sends a request to the [get run endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Runs/operation/getRunById)
        to get details about a job run.

        Args:
            run_id: The ID of the run to get details for.
            include_related: List of related fields to pull with the run.
                Valid values are "trigger", "job", "debug_logs", and "run_steps".
                If "debug_logs" is not provided in a request, then the included debug
                logs will be truncated to the last 1,000 lines of the debug log output file.

        Returns:
            The response from the dbt Cloud administrative API.
        """  # noqa
        params = {"include_related": include_related} if include_related else None
        return await self.call_endpoint(
            path=f"/runs/{run_id}/", http_method="GET", params=params
        )

    async def list_run_artifacts(
        self, run_id: int, step: Optional[int] = None
    ) -> Response:
        """
        Sends a request to the [list run artifacts endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Runs/operation/listArtifactsByRunId)
        to fetch a list of paths of artifacts generated for a completed run.

        Args:
            run_id: The ID of the run to list run artifacts for.
            step: The index of the step in the run to query for artifacts. The
                first step in the run has the index 1. If the step parameter is
                omitted, then this method will return the artifacts compiled
                for the last step in the run.

        Returns:
            The response from the dbt Cloud administrative API.
        """  # noqa
        params = {"step": step} if step else None
        return await self.call_endpoint(
            path=f"/runs/{run_id}/artifacts/", http_method="GET", params=params
        )

    async def get_run_artifact(
        self, run_id: int, path: str, step: Optional[int] = None
    ) -> Response:
        """
        Sends a request to the [get run artifact endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Runs/operation/getArtifactsByRunId)
        to fetch an artifact generated for a completed run.

        Args:
            run_id: The ID of the run to list run artifacts for.
            path: The relative path to the run artifact (e.g. manifest.json, catalog.json,
                run_results.json)
            step: The index of the step in the run to query for artifacts. The
                first step in the run has the index 1. If the step parameter is
                omitted, then this method will return the artifacts compiled
                for the last step in the run.

        Returns:
            The response from the dbt Cloud administrative API.
        """  # noqa
        params = {"step": step} if step else None
        return await self.call_endpoint(
            path=f"/runs/{run_id}/artifacts/{path}", http_method="GET", params=params
        )

    async def __aenter__(self):
        if self._closed:
            raise RuntimeError(
                "The client cannot be started again after it has been closed."
            )
        if self._started:
            raise RuntimeError("The client cannot be started more than once.")

        self._started = True

        return self

    async def __aexit__(self, *exc):
        self._closed = True
        await self._admin_client.__aexit__()

call_endpoint async

Call an endpoint in the dbt Cloud API.

Parameters:

Name Type Description Default
path str

The partial path for the request (e.g. /projects/). Will be appended onto the base URL as determined by the client configuration.

required
http_method str

HTTP method to call on the endpoint.

required
params Optional[Dict[str, Any]]

Query parameters to include in the request.

None
json Optional[Dict[str, Any]]

JSON serializable body to send in the request.

None

Returns:

Type Description
Response

The response from the dbt Cloud administrative API.

Source code in prefect_dbt/cloud/clients.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
async def call_endpoint(
    self,
    http_method: str,
    path: str,
    params: Optional[Dict[str, Any]] = None,
    json: Optional[Dict[str, Any]] = None,
) -> Response:
    """
    Call an endpoint in the dbt Cloud API.

    Args:
        path: The partial path for the request (e.g. /projects/). Will be appended
            onto the base URL as determined by the client configuration.
        http_method: HTTP method to call on the endpoint.
        params: Query parameters to include in the request.
        json: JSON serializable body to send in the request.

    Returns:
        The response from the dbt Cloud administrative API.
    """
    response = await self._admin_client.request(
        method=http_method, url=path, params=params, json=json
    )

    response.raise_for_status()

    return response

get_job async

Return job details for a job on an account.

Parameters:

Name Type Description Default
job_id int

Numeric ID of the job.

required
order_by Optional[str]

Field to order the result by. Use - to indicate reverse order.

None

Returns:

Type Description
Response

The response from the dbt Cloud administrative API.

Source code in prefect_dbt/cloud/clients.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def get_job(
    self,
    job_id: int,
    order_by: Optional[str] = None,
) -> Response:
    """
    Return job details for a job on an account.

    Args:
        job_id: Numeric ID of the job.
        order_by: Field to order the result by. Use - to indicate reverse order.

    Returns:
        The response from the dbt Cloud administrative API.
    """  # noqa
    params = {"order_by": order_by} if order_by else None
    return await self.call_endpoint(
        path=f"/jobs/{job_id}/", http_method="GET", params=params
    )

get_run async

Sends a request to the get run endpoint to get details about a job run.

Parameters:

Name Type Description Default
run_id int

The ID of the run to get details for.

required
include_related Optional[List[Literal['trigger', 'job', 'debug_logs', 'run_steps']]]

List of related fields to pull with the run. Valid values are "trigger", "job", "debug_logs", and "run_steps". If "debug_logs" is not provided in a request, then the included debug logs will be truncated to the last 1,000 lines of the debug log output file.

None

Returns:

Type Description
Response

The response from the dbt Cloud administrative API.

Source code in prefect_dbt/cloud/clients.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def get_run(
    self,
    run_id: int,
    include_related: Optional[
        List[Literal["trigger", "job", "debug_logs", "run_steps"]]
    ] = None,
) -> Response:
    """
    Sends a request to the [get run endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Runs/operation/getRunById)
    to get details about a job run.

    Args:
        run_id: The ID of the run to get details for.
        include_related: List of related fields to pull with the run.
            Valid values are "trigger", "job", "debug_logs", and "run_steps".
            If "debug_logs" is not provided in a request, then the included debug
            logs will be truncated to the last 1,000 lines of the debug log output file.

    Returns:
        The response from the dbt Cloud administrative API.
    """  # noqa
    params = {"include_related": include_related} if include_related else None
    return await self.call_endpoint(
        path=f"/runs/{run_id}/", http_method="GET", params=params
    )

get_run_artifact async

Sends a request to the get run artifact endpoint to fetch an artifact generated for a completed run.

Parameters:

Name Type Description Default
run_id int

The ID of the run to list run artifacts for.

required
path str

The relative path to the run artifact (e.g. manifest.json, catalog.json, run_results.json)

required
step Optional[int]

The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run.

None

Returns:

Type Description
Response

The response from the dbt Cloud administrative API.

Source code in prefect_dbt/cloud/clients.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def get_run_artifact(
    self, run_id: int, path: str, step: Optional[int] = None
) -> Response:
    """
    Sends a request to the [get run artifact endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Runs/operation/getArtifactsByRunId)
    to fetch an artifact generated for a completed run.

    Args:
        run_id: The ID of the run to list run artifacts for.
        path: The relative path to the run artifact (e.g. manifest.json, catalog.json,
            run_results.json)
        step: The index of the step in the run to query for artifacts. The
            first step in the run has the index 1. If the step parameter is
            omitted, then this method will return the artifacts compiled
            for the last step in the run.

    Returns:
        The response from the dbt Cloud administrative API.
    """  # noqa
    params = {"step": step} if step else None
    return await self.call_endpoint(
        path=f"/runs/{run_id}/artifacts/{path}", http_method="GET", params=params
    )

list_run_artifacts async

Sends a request to the list run artifacts endpoint to fetch a list of paths of artifacts generated for a completed run.

Parameters:

Name Type Description Default
run_id int

The ID of the run to list run artifacts for.

required
step Optional[int]

The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run.

None

Returns:

Type Description
Response

The response from the dbt Cloud administrative API.

Source code in prefect_dbt/cloud/clients.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def list_run_artifacts(
    self, run_id: int, step: Optional[int] = None
) -> Response:
    """
    Sends a request to the [list run artifacts endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Runs/operation/listArtifactsByRunId)
    to fetch a list of paths of artifacts generated for a completed run.

    Args:
        run_id: The ID of the run to list run artifacts for.
        step: The index of the step in the run to query for artifacts. The
            first step in the run has the index 1. If the step parameter is
            omitted, then this method will return the artifacts compiled
            for the last step in the run.

    Returns:
        The response from the dbt Cloud administrative API.
    """  # noqa
    params = {"step": step} if step else None
    return await self.call_endpoint(
        path=f"/runs/{run_id}/artifacts/", http_method="GET", params=params
    )

trigger_job_run async

Sends a request to the trigger job run endpoint to initiate a job run.

Parameters:

Name Type Description Default
job_id int

The ID of the job to trigger.

required
options Optional[TriggerJobRunOptions]

An optional TriggerJobRunOptions instance to specify overrides for the triggered job run.

None

Returns:

Type Description
Response

The response from the dbt Cloud administrative API.

Source code in prefect_dbt/cloud/clients.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
async def trigger_job_run(
    self, job_id: int, options: Optional[TriggerJobRunOptions] = None
) -> Response:
    """
    Sends a request to the [trigger job run endpoint](https://docs.getdbt.com/dbt-cloud/api-v2#tag/Jobs/operation/triggerRun)
    to initiate a job run.

    Args:
        job_id: The ID of the job to trigger.
        options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run.

    Returns:
        The response from the dbt Cloud administrative API.
    """  # noqa
    if options is None:
        options = TriggerJobRunOptions()

    return await self.call_endpoint(
        path=f"/jobs/{job_id}/run/",
        http_method="POST",
        json=options.dict(exclude_none=True),
    )

DbtCloudMetadataClient

Client for interacting with the dbt cloud Administrative API.

Parameters:

Name Type Description Default
api_key str

API key to authenticate with the dbt Cloud administrative API.

required
domain str

Domain at which the dbt Cloud API is hosted.

'metadata.cloud.getdbt.com'
Source code in prefect_dbt/cloud/clients.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class DbtCloudMetadataClient:
    """
    Client for interacting with the dbt cloud Administrative API.

    Args:
        api_key: API key to authenticate with the dbt Cloud administrative API.
        domain: Domain at which the dbt Cloud API is hosted.
    """

    def __init__(self, api_key: str, domain: str = "metadata.cloud.getdbt.com"):
        self._http_endpoint = HTTPEndpoint(
            base_headers={
                "Authorization": f"Bearer {api_key}",
                "user-agent": f"prefect-{prefect.__version__}",
                "x-dbt-partner-source": "prefect",
                "content-type": "application/json",
            },
            url=f"https://{domain}/graphql",
        )

    def query(
        self,
        query: str,
        variables: Optional[Dict] = None,
        operation_name: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Run a GraphQL query against the dbt Cloud metadata API.

        Args:
            query: The GraphQL query to run.
            variables: The values of any variables defined in the GraphQL query.
            operation_name: The name of the operation to run if multiple operations
                are defined in the provided query.

        Returns:
            The result of the GraphQL query.
        """
        return self._http_endpoint(
            query=query, variables=variables, operation_name=operation_name
        )

query

Run a GraphQL query against the dbt Cloud metadata API.

Parameters:

Name Type Description Default
query str

The GraphQL query to run.

required
variables Optional[Dict]

The values of any variables defined in the GraphQL query.

None
operation_name Optional[str]

The name of the operation to run if multiple operations are defined in the provided query.

None

Returns:

Type Description
Dict[str, Any]

The result of the GraphQL query.

Source code in prefect_dbt/cloud/clients.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def query(
    self,
    query: str,
    variables: Optional[Dict] = None,
    operation_name: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Run a GraphQL query against the dbt Cloud metadata API.

    Args:
        query: The GraphQL query to run.
        variables: The values of any variables defined in the GraphQL query.
        operation_name: The name of the operation to run if multiple operations
            are defined in the provided query.

    Returns:
        The result of the GraphQL query.
    """
    return self._http_endpoint(
        query=query, variables=variables, operation_name=operation_name
    )