Skip to content

prefect_databricks.rest

This is a module containing generic REST tasks.

HTTPMethod

Bases: Enum

Available HTTP request methods.

Source code in prefect_databricks/rest.py
33
34
35
36
37
38
39
40
41
42
class HTTPMethod(Enum):
    """
    Available HTTP request methods.
    """

    GET = "get"
    POST = "post"
    PUT = "put"
    DELETE = "delete"
    PATCH = "patch"

execute_endpoint async

Generic function for executing REST endpoints.

Parameters:

Name Type Description Default
endpoint str

The endpoint route.

required
databricks_credentials DatabricksCredentials

Credentials to use for authentication with Databricks.

required
http_method HTTPMethod

Either GET, POST, PUT, DELETE, or PATCH.

GET
params Dict[str, Any]

URL query parameters in the request.

None
json Dict[str, Any]

JSON serializable object to include in the body of the request.

None
**kwargs Dict[str, Any]

Additional keyword arguments to pass.

{}

Returns:

Type Description
Response

The httpx.Response from interacting with the endpoint.

Examples:

Lists jobs on the Databricks instance.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.rest import execute_endpoint
@flow
def example_execute_endpoint_flow():
    endpoint = "/2.1/jobs/list"
    databricks_credentials = DatabricksCredentials.load("my-block")
    params = {
        "limit": 5,
        "offset": None,
        "expand_tasks": True,
    }
    response = execute_endpoint(
        endpoint,
        databricks_credentials,
        params=params
    )
    return response.json()

Source code in prefect_databricks/rest.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@task
async def execute_endpoint(
    endpoint: str,
    databricks_credentials: "DatabricksCredentials",
    http_method: HTTPMethod = HTTPMethod.GET,
    params: Dict[str, Any] = None,
    json: Dict[str, Any] = None,
    **kwargs: Dict[str, Any],
) -> httpx.Response:
    """
    Generic function for executing REST endpoints.

    Args:
        endpoint: The endpoint route.
        databricks_credentials: Credentials to use for authentication with Databricks.
        http_method: Either GET, POST, PUT, DELETE, or PATCH.
        params: URL query parameters in the request.
        json: JSON serializable object to include in the body of the request.
        **kwargs: Additional keyword arguments to pass.

    Returns:
        The httpx.Response from interacting with the endpoint.

    Examples:
        Lists jobs on the Databricks instance.
        ```python
        from prefect import flow
        from prefect_databricks import DatabricksCredentials
        from prefect_databricks.rest import execute_endpoint
        @flow
        def example_execute_endpoint_flow():
            endpoint = "/2.1/jobs/list"
            databricks_credentials = DatabricksCredentials.load("my-block")
            params = {
                "limit": 5,
                "offset": None,
                "expand_tasks": True,
            }
            response = execute_endpoint(
                endpoint,
                databricks_credentials,
                params=params
            )
            return response.json()
        ```
    """
    if isinstance(http_method, HTTPMethod):
        http_method = http_method.value

    if params is not None:
        stripped_params = strip_kwargs(**params)
    else:
        stripped_params = None

    if json is not None:
        kwargs["json"] = strip_kwargs(**json)

    async with databricks_credentials.get_client() as client:
        response = await getattr(client, http_method)(
            endpoint, params=stripped_params, **kwargs
        )

    return response

serialize_model

Recursively serializes pydantic.BaseModel into JSON; returns original obj if not a BaseModel.

Parameters:

Name Type Description Default
obj Any

Input object to serialize.

required

Returns:

Type Description
Any

Serialized version of object.

Source code in prefect_databricks/rest.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def serialize_model(obj: Any) -> Any:
    """
    Recursively serializes `pydantic.BaseModel` into JSON;
    returns original obj if not a `BaseModel`.

    Args:
        obj: Input object to serialize.

    Returns:
        Serialized version of object.
    """
    if isinstance(obj, list):
        return [serialize_model(o) for o in obj]
    elif isinstance(obj, Dict):
        return {k: serialize_model(v) for k, v in obj.items()}

    if isinstance(obj, MODELS):
        return model_dump(obj, mode="json")
    elif isinstance(obj, Enum):
        return obj.value
    return obj

strip_kwargs

Recursively drops keyword arguments if value is None, and serializes any pydantic.BaseModel types.

Parameters:

Name Type Description Default
**kwargs Dict

Input keyword arguments.

{}

Returns:

Type Description
Dict

Stripped version of kwargs.

Source code in prefect_databricks/rest.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def strip_kwargs(**kwargs: Dict) -> Dict:
    """
    Recursively drops keyword arguments if value is None,
    and serializes any `pydantic.BaseModel` types.

    Args:
        **kwargs: Input keyword arguments.

    Returns:
        Stripped version of kwargs.
    """
    stripped_dict = {}
    for k, v in kwargs.items():
        v = serialize_model(v)
        if isinstance(v, dict):
            v = strip_kwargs(**v)
        if v is not None:
            stripped_dict[k] = v
    return stripped_dict or {}