prefect.flow_runs
¶
wait_for_flow_run
async
¶
Waits for the prefect flow run to finish and returns the FlowRun
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_run_id
|
UUID
|
The flow run ID for the flow run to wait for. |
required |
timeout
|
Optional[int]
|
The wait timeout in seconds. Defaults to 10800 (3 hours). |
10800
|
poll_interval
|
int
|
The poll interval in seconds. Defaults to 5. |
5
|
Returns:
Name | Type | Description |
---|---|---|
FlowRun |
FlowRun
|
The finished flow run. |
Raises:
Type | Description |
---|---|
FlowWaitTimeout
|
If flow run goes over the timeout. |
Examples:
Create a flow run for a deployment and wait for it to finish:
import asyncio
from prefect import get_client
from prefect.flow_runs import wait_for_flow_run
async def main():
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
if __name__ == "__main__":
asyncio.run(main())
Trigger multiple flow runs and wait for them to finish:
import asyncio
from prefect import get_client
from prefect.flow_runs import wait_for_flow_run
async def main(num_runs: int):
async with get_client() as client:
flow_runs = [
await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
for _
in range(num_runs)
]
coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
finished_flow_runs = await asyncio.gather(*coros)
print([flow_run.state for flow_run in finished_flow_runs])
if __name__ == "__main__":
asyncio.run(main(num_runs=10))
Source code in prefect/flow_runs.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
|