Skip to content

prefect_azure.ml_datastore

Tasks for interacting with Azure ML Datastore

ml_get_datastore async

Gets the Datastore within the Workspace.

Parameters:

Name Type Description Default
ml_credentials AzureMlCredentials

Credentials to use for authentication with Azure.

required
datastore_name str

The name of the Datastore. If None, then the default Datastore of the Workspace is returned.

None
Example

Get Datastore object

from prefect import flow
from prefect_azure import AzureMlCredentials
from prefect_azure.ml_datastore import ml_get_datastore

@flow
def example_ml_get_datastore_flow():
    ml_credentials = AzureMlCredentials(
        tenant_id="tenant_id",
        service_principal_id="service_principal_id",
        service_principal_password="service_principal_password",
        subscription_id="subscription_id",
        resource_group="resource_group",
        workspace_name="workspace_name",
    )
    results = ml_get_datastore(ml_credentials, datastore_name="datastore_name")
    return results

Source code in prefect_azure/ml_datastore.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@task
async def ml_get_datastore(
    ml_credentials: "AzureMlCredentials", datastore_name: str = None
) -> Datastore:
    """
    Gets the Datastore within the Workspace.

    Args:
        ml_credentials: Credentials to use for authentication with Azure.
        datastore_name: The name of the Datastore. If `None`, then the
            default Datastore of the Workspace is returned.

    Example:
        Get Datastore object
        ```python
        from prefect import flow
        from prefect_azure import AzureMlCredentials
        from prefect_azure.ml_datastore import ml_get_datastore

        @flow
        def example_ml_get_datastore_flow():
            ml_credentials = AzureMlCredentials(
                tenant_id="tenant_id",
                service_principal_id="service_principal_id",
                service_principal_password="service_principal_password",
                subscription_id="subscription_id",
                resource_group="resource_group",
                workspace_name="workspace_name",
            )
            results = ml_get_datastore(ml_credentials, datastore_name="datastore_name")
            return results
        ```
    """
    logger = get_run_logger()
    logger.info("Getting datastore %s", datastore_name)

    result = await _get_datastore(ml_credentials, datastore_name)
    return result

ml_list_datastores

Lists the Datastores in the Workspace.

Parameters:

Name Type Description Default
ml_credentials AzureMlCredentials

Credentials to use for authentication with Azure.

required
Example

List Datastore objects

from prefect import flow
from prefect_azure import AzureMlCredentials
from prefect_azure.ml_datastore import ml_list_datastores

@flow
def example_ml_list_datastores_flow():
    ml_credentials = AzureMlCredentials(
        tenant_id="tenant_id",
        service_principal_id="service_principal_id",
        service_principal_password="service_principal_password",
        subscription_id="subscription_id",
        resource_group="resource_group",
        workspace_name="workspace_name",
    )
    results = ml_list_datastores(ml_credentials)
    return results

Source code in prefect_azure/ml_datastore.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@task
def ml_list_datastores(ml_credentials: "AzureMlCredentials") -> Dict:
    """
    Lists the Datastores in the Workspace.

    Args:
        ml_credentials: Credentials to use for authentication with Azure.

    Example:
        List Datastore objects
        ```python
        from prefect import flow
        from prefect_azure import AzureMlCredentials
        from prefect_azure.ml_datastore import ml_list_datastores

        @flow
        def example_ml_list_datastores_flow():
            ml_credentials = AzureMlCredentials(
                tenant_id="tenant_id",
                service_principal_id="service_principal_id",
                service_principal_password="service_principal_password",
                subscription_id="subscription_id",
                resource_group="resource_group",
                workspace_name="workspace_name",
            )
            results = ml_list_datastores(ml_credentials)
            return results
        ```
    """
    logger = get_run_logger()
    logger.info("Listing datastores")

    workspace = ml_credentials.get_workspace()
    results = workspace.datastores
    return results

ml_register_datastore_blob_container async

Registers a Azure Blob Storage container as a Datastore in a Azure ML service Workspace.

Parameters:

Name Type Description Default
container_name str

The name of the container.

required
ml_credentials AzureMlCredentials

Credentials to use for authentication with Azure ML.

required
blob_storage_credentials AzureBlobStorageCredentials

Credentials to use for authentication with Azure Blob Storage.

required
datastore_name str

The name of the datastore. If not defined, the container name will be used.

None
create_container_if_not_exists bool

Create a container, if one does not exist with the given name.

False
overwrite bool

Overwrite an existing datastore. If the datastore does not exist, it will be created.

False
set_as_default bool

Set the created Datastore as the default datastore for the Workspace.

False
Example

Upload Datastore object

from prefect import flow
from prefect_azure import AzureMlCredentials
from prefect_azure.ml_datastore import ml_register_datastore_blob_container

@flow
def example_ml_register_datastore_blob_container_flow():
    ml_credentials = AzureMlCredentials(
        tenant_id="tenant_id",
        service_principal_id="service_principal_id",
        service_principal_password="service_principal_password",
        subscription_id="subscription_id",
        resource_group="resource_group",
        workspace_name="workspace_name",
    )
    blob_storage_credentials = AzureBlobStorageCredentials("connection_string")
    result = ml_register_datastore_blob_container(
        "container",
        ml_credentials,
        blob_storage_credentials,
        datastore_name="datastore_name"
    )
    return result

Source code in prefect_azure/ml_datastore.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@task
async def ml_register_datastore_blob_container(
    container_name: str,
    ml_credentials: "AzureMlCredentials",
    blob_storage_credentials: "AzureBlobStorageCredentials",
    datastore_name: str = None,
    create_container_if_not_exists: bool = False,
    overwrite: bool = False,
    set_as_default: bool = False,
) -> "AzureBlobDatastore":
    """
    Registers a Azure Blob Storage container as a
    Datastore in a Azure ML service Workspace.

    Args:
        container_name: The name of the container.
        ml_credentials: Credentials to use for authentication with Azure ML.
        blob_storage_credentials: Credentials to use for authentication
            with Azure Blob Storage.
        datastore_name: The name of the datastore. If not defined, the
            container name will be used.
        create_container_if_not_exists: Create a container, if one does not
            exist with the given name.
        overwrite: Overwrite an existing datastore. If
            the datastore does not exist, it will be created.
        set_as_default: Set the created Datastore as the default datastore
            for the Workspace.

    Example:
        Upload Datastore object
        ```python
        from prefect import flow
        from prefect_azure import AzureMlCredentials
        from prefect_azure.ml_datastore import ml_register_datastore_blob_container

        @flow
        def example_ml_register_datastore_blob_container_flow():
            ml_credentials = AzureMlCredentials(
                tenant_id="tenant_id",
                service_principal_id="service_principal_id",
                service_principal_password="service_principal_password",
                subscription_id="subscription_id",
                resource_group="resource_group",
                workspace_name="workspace_name",
            )
            blob_storage_credentials = AzureBlobStorageCredentials("connection_string")
            result = ml_register_datastore_blob_container(
                "container",
                ml_credentials,
                blob_storage_credentials,
                datastore_name="datastore_name"
            )
            return result
        ```
    """
    logger = get_run_logger()

    if datastore_name is None:
        datastore_name = container_name

    logger.info(
        "Registering %s container into %s datastore", container_name, datastore_name
    )

    workspace = ml_credentials.get_workspace()
    async with blob_storage_credentials.get_client() as blob_service_client:
        credential = blob_service_client.credential
        account_name = credential.account_name
        account_key = credential.account_key

    partial_register = partial(
        Datastore.register_azure_blob_container,
        workspace=workspace,
        datastore_name=datastore_name,
        container_name=container_name,
        account_name=account_name,
        account_key=account_key,
        overwrite=overwrite,
        create_if_not_exists=create_container_if_not_exists,
    )
    result = await to_thread.run_sync(partial_register)

    if set_as_default:
        result.set_as_default()

    return result

ml_upload_datastore async

Uploads local files to a Datastore.

Parameters:

Name Type Description Default
path Union[str, Path, List[Union[str, Path]]]

The path to a single file, single directory, or a list of path to files to be uploaded.

required
ml_credentials AzureMlCredentials

Credentials to use for authentication with Azure.

required
target_path Union[str, Path]

The location in the blob container to upload to. If None, then upload to root.

None
relative_root Union[str, Path]

The root from which is used to determine the path of the files in the blob. For example, if we upload /path/to/file.txt, and we define base path to be /path, when file.txt is uploaded to the blob storage, it will have the path of /to/file.txt.

None
datastore_name str

The name of the Datastore. If None, then the default Datastore of the Workspace is returned.

None
overwrite bool

Overwrite existing file(s).

False
Example

Upload Datastore object

from prefect import flow
from prefect_azure import AzureMlCredentials
from prefect_azure.ml_datastore import ml_upload_datastore

@flow
def example_ml_upload_datastore_flow():
    ml_credentials = AzureMlCredentials(
        tenant_id="tenant_id",
        service_principal_id="service_principal_id",
        service_principal_password="service_principal_password",
        subscription_id="subscription_id",
        resource_group="resource_group",
        workspace_name="workspace_name",
    )
    result = ml_upload_datastore(
        "path/to/dir/or/file",
        ml_credentials,
        datastore_name="datastore_name"
    )
    return result

Source code in prefect_azure/ml_datastore.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@task
async def ml_upload_datastore(
    path: Union[str, Path, List[Union[str, Path]]],
    ml_credentials: "AzureMlCredentials",
    target_path: Union[str, Path] = None,
    relative_root: Union[str, Path] = None,
    datastore_name: str = None,
    overwrite: bool = False,
) -> "DataReference":
    """
    Uploads local files to a Datastore.

    Args:
        path: The path to a single file, single directory,
            or a list of path to files to be uploaded.
        ml_credentials: Credentials to use for authentication with Azure.
        target_path: The location in the blob container to upload to. If
            None, then upload to root.
        relative_root: The root from which is used to determine the path of
            the files in the blob. For example, if we upload /path/to/file.txt,
            and we define base path to be /path, when file.txt is uploaded
            to the blob storage, it will have the path of /to/file.txt.
        datastore_name: The name of the Datastore. If `None`, then the
            default Datastore of the Workspace is returned.
        overwrite: Overwrite existing file(s).

    Example:
        Upload Datastore object
        ```python
        from prefect import flow
        from prefect_azure import AzureMlCredentials
        from prefect_azure.ml_datastore import ml_upload_datastore

        @flow
        def example_ml_upload_datastore_flow():
            ml_credentials = AzureMlCredentials(
                tenant_id="tenant_id",
                service_principal_id="service_principal_id",
                service_principal_password="service_principal_password",
                subscription_id="subscription_id",
                resource_group="resource_group",
                workspace_name="workspace_name",
            )
            result = ml_upload_datastore(
                "path/to/dir/or/file",
                ml_credentials,
                datastore_name="datastore_name"
            )
            return result
        ```
    """
    logger = get_run_logger()
    logger.info("Uploading %s into %s datastore", path, datastore_name)

    datastore = await _get_datastore(ml_credentials, datastore_name)

    if isinstance(path, Path):
        path = str(path)
    elif isinstance(path, list) and isinstance(path[0], Path):
        path = [str(p) for p in path]

    if isinstance(target_path, Path):
        target_path = str(target_path)

    if isinstance(relative_root, Path):
        relative_root = str(relative_root)

    if isinstance(path, str) and os.path.isdir(path):
        partial_upload = partial(
            datastore.upload,
            src_dir=path,
            target_path=target_path,
            overwrite=overwrite,
            show_progress=False,
        )
    else:
        partial_upload = partial(
            datastore.upload_files,
            files=path if isinstance(path, list) else [path],
            relative_root=relative_root,
            target_path=target_path,
            overwrite=overwrite,
            show_progress=False,
        )

    result = await to_thread.run_sync(partial_upload)
    return result