State Change Hooks¶
State change hooks execute code in response to changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow. This guide provides examples of real-world use cases.
Example use cases¶
Send a notification when a flow run fails¶
State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed
state. Let's run a client-side hook upon a flow run entering a Failed
state.
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load(
"slack-webhook/my-slack-webhook"
)
slack_webhook_block.notify(
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
raise ValueError("oops!")
if __name__ == "__main__":
failing_flow()
Note that because we've configured retries in this example, the on_failure
hook will not run until all retries
have completed, when the flow run enters a Failed
state.
Delete a Cloud Run job when a flow run crashes¶
State change hooks can aid in managing infrastructure cleanup in scenarios where tasks spin up individual infrastructure resources independently of Prefect.
When a flow run crashes, tasks may exit abruptly, resulting in the potential omission of cleanup logic within the tasks.
State change hooks can be used to ensure infrastructure is properly cleaned up even when a flow run enters a Crashed
state!
Let's create a hook that deletes a Cloud Run job if the flow run crashes.
import os
from prefect import flow, task
from prefect.blocks.system import String
from prefect.client import get_client
import prefect.runtime
async def delete_cloud_run_job(flow, flow_run, state):
"""Flow run state change hook that deletes a Cloud Run Job if
the flow run crashes."""
# retrieve Cloud Run job name
cloud_run_job_name = await String.load(
name="crashing-flow-cloud-run-job"
)
# delete Cloud Run job
delete_job_command = f"yes | gcloud beta run jobs delete
{cloud_run_job_name.value} --region us-central1"
os.system(delete_job_command)
# clean up the Cloud Run job string block as well
async with get_client() as client:
block_document = await client.read_block_document_by_name(
"crashing-flow-cloud-run-job", block_type_slug="string"
)
await client.delete_block_document(block_document.id)
@task
def my_task_that_crashes():
raise SystemExit("Crashing on purpose!")
@flow(on_crashed=[delete_cloud_run_job])
def crashing_flow():
"""Save the flow run name (i.e. Cloud Run job name) as a
String block. It then executes a task that ends up crashing."""
flow_run_name = prefect.runtime.flow_run.name
cloud_run_job_name = String(value=flow_run_name)
cloud_run_job_name.save(
name="crashing-flow-cloud-run-job", overwrite=True
)
my_task_that_crashes()
if __name__ == "__main__":
crashing_flow()