States¶
Overview¶
States are rich objects that contain information about the status of a particular task run or flow run. While you don't need to know the details of the states to use Prefect, you can give your workflows superpowers by taking advantage of it.
At any moment, you can learn anything you need to know about a task or flow by examining its current state or the history of its states. For example, a state could tell you that a task:
-
is scheduled to make a third run attempt in an hour
-
succeeded and what data it produced
-
was scheduled to run, but later cancelled
-
used the cached result of a previous run instead of re-running
-
failed because it timed out
By manipulating a relatively small number of task states, Prefect flows can harness the complexity that emerges in workflows.
Only runs have states
Though we often refer to the "state" of a flow or a task, what we really mean is the state of a flow run or a task run. Flows and tasks are templates that describe what a system does; only when we run the system does it also take on a state. So while we might refer to a task as "running" or being "successful", we really mean that a specific instance of the task is in that state.
State Types¶
States have names and types. State types are canonical, with specific orchestration rules that apply to transitions into and out of each state type. A state's name, is often, but not always, synonymous with its type. For example, a task run that is running for the first time has a state with the name Running and the type RUNNING
. However, if the task retries, that same task run will have the name Retrying and the type RUNNING
. Each time the task run transitions into the RUNNING
state, the same orchestration rules are applied.
There are terminal state types from which there are no orchestrated transitions to any other state type.
COMPLETED
CANCELLED
FAILED
CRASHED
The full complement of states and state types includes:
Name | Type | Terminal? | Description |
---|---|---|---|
Scheduled | SCHEDULED | No | The run will begin at a particular time in the future. |
Late | SCHEDULED | No | The run's scheduled start time has passed, but it has not transitioned to PENDING (15 seconds by default). |
AwaitingRetry | SCHEDULED | No | The run did not complete successfully because of a code issue and had remaining retry attempts. |
Pending | PENDING | No | The run has been submitted to run, but is waiting on necessary preconditions to be satisfied. |
Running | RUNNING | No | The run code is currently executing. |
Retrying | RUNNING | No | The run code is currently executing after previously not complete successfully. |
Paused | PAUSED | No | The run code has stopped executing until it receives manual approval to proceed. |
Cancelling | CANCELLING | No | The infrastructure on which the code was running is being cleaned up. |
Cancelled | CANCELLED | Yes | The run did not complete because a user determined that it should not. |
Completed | COMPLETED | Yes | The run completed successfully. |
Failed | FAILED | Yes | The run did not complete because of a code issue and had no remaining retry attempts. |
Crashed | CRASHED | Yes | The run did not complete because of an infrastructure issue. |
Returned values¶
When calling a task or a flow, there are three types of returned values:
- Data: A Python object (such as
int
,str
,dict
,list
, and so on). State
: A Prefect object indicating the state of a flow or task run.PrefectFuture
: A Prefect object that contains both data and State.
Returning data is the default behavior any time you call your_task()
.
Returning Prefect State
occurs anytime you call your task or flow with the argument return_state=True
.
Returning PrefectFuture
is achieved by calling your_task.submit()
.
Return Data¶
By default, running a task will return data:
from prefect import flow, task
@task
def add_one(x):
return x + 1
@flow
def my_flow():
result = add_one(1) # return int
The same rule applies for a subflow:
@flow
def subflow():
return 42
@flow
def my_flow():
result = subflow() # return data
Return Prefect State¶
To return a State
instead, add return_state=True
as a parameter of your task call.
@flow
def my_flow():
state = add_one(1, return_state=True) # return State
To get data from a State
, call .result()
.
@flow
def my_flow():
state = add_one(1, return_state=True) # return State
result = state.result() # return int
The same rule applies for a subflow:
@flow
def subflow():
return 42
@flow
def my_flow():
state = subflow(return_state=True) # return State
result = state.result() # return int
Return a PrefectFuture¶
To get a PrefectFuture
, add .submit()
to your task call.
@flow
def my_flow():
future = add_one.submit(1) # return PrefectFuture
To get data from a PrefectFuture
, call .result()
.
@flow
def my_flow():
future = add_one.submit(1) # return PrefectFuture
result = future.result() # return data
To get a State
from a PrefectFuture
, call .wait()
.
@flow
def my_flow():
future = add_one.submit(1) # return PrefectFuture
state = future.wait() # return State
Final state determination¶
The final state of a flow is determined by its return value. The following rules apply:
- If an exception is raised directly in the flow function, the flow run is marked as
FAILED
. - If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and subflows within it. - If any task run or subflow run failed and none were cancelled, then the final flow run state is marked as
FAILED
. - If any task run or subflow run was cancelled, then the final flow run state is marked as
CANCELLED
. - If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
- If the flow run returns any other object, then it is marked as successfully completed.
See the Final state determination section of the Flows documentation for further details and examples.
State Change Hooks¶
State change hooks execute code in response to changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.
A simple example¶
from prefect import flow
def my_success_hook(flow, flow_run, state):
print("Flow run succeeded!")
@flow(on_completion=[my_success_hook])
def my_flow():
return 42
my_flow()
Create and use hooks¶
Available state change hooks¶
Type | Flow | Task | Description |
---|---|---|---|
on_completion |
✓ | ✓ | Executes when a flow or task run enters a Completed state. |
on_failure |
✓ | ✓ | Executes when a flow or task run enters a Failed state. |
on_cancellation |
✓ | - | Executes when a flow run enters a Cancelling state. |
on_crashed |
✓ | - | Executes when a flow run enters a Crashed state. |
on_running |
✓ | - | Executes when a flow run enters a Running state. |
Create flow run state change hooks¶
def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State):
"""This is the required signature for a flow run state
change hook. This hook can only be passed into flows.
"""
# pass hook as a list of callables
@flow(on_completion=[my_flow_hook])
Create task run state change hooks¶
def my_task_hook(task: Task, task_run: TaskRun, state: State):
"""This is the required signature for a task run state change
hook. This hook can only be passed into tasks.
"""
# pass hook as a list of callables
@task(on_failure=[my_task_hook])
Use multiple state change hooks¶
State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:
def my_success_hook(task, task_run, state):
print("Task run succeeded!")
def my_failure_hook(task, task_run, state):
print("Task run failed!")
def my_succeed_or_fail_hook(task, task_run, state):
print("If the task run succeeds or fails, this hook runs.")
@task(
on_completion=[my_success_hook, my_succeed_or_fail_hook],
on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)
Pass kwargs
to your hooks¶
The Prefect engine will call your hooks for you upon the state change, passing in the flow, flow run, and state objects.
However, you can define your hook to have additional default arguments:
from prefect import flow
data = {}
def my_hook(flow, flow_run, state, my_arg="custom_value"):
data.update(my_arg=my_arg, state=state)
@flow(on_completion=[my_hook])
def lazy_flow():
pass
state = lazy_flow(return_state=True)
assert data == {"my_arg": "custom_value", "state": state}
... or define your hook to accept arbitrary keyword arguments:
from functools import partial
from prefect import flow, task
data = {}
def my_hook(task, task_run, state, **kwargs):
data.update(state=state, **kwargs)
@task
def bad_task():
raise ValueError("meh")
@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
bad_task_with_a_hook = bad_task.with_options(
on_failure=[partial(my_hook, **dict(x=x, y=y))]
)
# return a tuple of "bar" and the task run state
# to avoid raising the task's exception
return "bar", bad_task_with_a_hook(return_state=True)
_, task_run_state = ok_with_failure_flow()
assert data == {"x": "foo", "y": 42, "state": task_run_state}