prefect_dask.utils
¶
Utils to use alongside prefect-dask.
get_async_dask_client
async
¶
Yields a temporary asynchronous dask client; this is useful
for parallelizing operations on dask collections,
such as a dask.DataFrame
or dask.Bag
.
Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
Optional[Union[int, float, str, timedelta]]
|
Timeout after which to error out; has no effect in
flow run contexts because the client has already started;
Defaults to the |
None
|
client_kwargs |
Dict[str, Any]
|
Additional keyword arguments to pass to
|
{}
|
Yields:
Type | Description |
---|---|
AsyncGenerator[Client, None]
|
A temporary asynchronous dask client. |
Examples:
Use get_async_dask_client
to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client
@task
async def compute_task():
async with get_async_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = await client.compute(df.describe())
return summary_df
@flow(task_runner=DaskTaskRunner())
async def dask_flow():
prefect_future = await compute_task.submit()
return await prefect_future.result()
asyncio.run(dask_flow())
Source code in prefect_dask/utils.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
|
get_dask_client
¶
Yields a temporary synchronous dask client; this is useful
for parallelizing operations on dask collections,
such as a dask.DataFrame
or dask.Bag
.
Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.
When in an async context, we recommend using get_async_dask_client
instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
Optional[Union[int, float, str, timedelta]]
|
Timeout after which to error out; has no effect in
flow run contexts because the client has already started;
Defaults to the |
None
|
client_kwargs |
Dict[str, Any]
|
Additional keyword arguments to pass to
|
{}
|
Yields:
Type | Description |
---|---|
Client
|
A temporary synchronous dask client. |
Examples:
Use get_dask_client
to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
@task
def compute_task():
with get_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = client.compute(df.describe()).result()
return summary_df
@flow(task_runner=DaskTaskRunner())
def dask_flow():
prefect_future = compute_task.submit()
return prefect_future.result()
dask_flow()
Source code in prefect_dask/utils.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
|