Skip to content

Creating Interactive Workflows

Experimental

The wait_for_input parameter used in the pause_flow_run or suspend_flow_run functions is an experimental feature. The interface or behavior of this feature may change without warning in future releases.

If you encounter any issues, please let us know in Slack or with a Github issue.

When a flow run is paused or suspended, you can receive input from the user. This is useful when you need to ask the user for additional information or feedback before resuming the flow run.

Waiting for input

To receive input you must use the wait_for_input parameter in the pause_flow_run or suspend_flow_run functions. This parameter accepts a subclass of prefect.input.RunInput. RunInput is a subclass of pydantic.BaseModel and can be used to define the input that you want to receive:

from prefect import flow, pause_flow_run
from prefect.input import RunInput

class UserNameInput(RunInput):
    name: str

In this case we are defining a UserNameInput class that will receive a name string from the user. You can then use this class in the wait_for_input parameter:

@flow
async def greet_user():
    logger = get_run_logger()

    user_input = await pause_flow_run(
        wait_for_input=UserNameInput
    )

    logger.info(f"Hello, {user_input.name}!")

When the flow run is paused, the user will be prompted to enter a name. If the user does not enter a name, the flow run will not resume. If the user enters a name, the flow run will resume and the user_input variable will contain the name that the user entered.

Providing initial data

You can set default values for fields in your model by using the with_initial_data method. This is useful when you want to provide default values for the fields in your own RunInput subclasses.

Expanding on the example above, you could default the name field to something anonymous.

@flow
async def greet_user():
    logger = get_run_logger()

    user_input = await pause_flow_run(
        wait_for_input=UserNameInput.with_initial_data(name="anonymous")
    )

    if user_input.name == "anonymous":
        logger.info("Hello, stranger!")
    else:
        logger.info(f"Hello, {user_input.name}!")

Handling custom validation

Prefect uses the fields and type hints on your RunInput subclass to validate the general structure of input your flow run receives, but you might require more complex validation. If you do, you can use Pydantic validators.

Custom validation runs after the flow run resumes

Prefect transforms the type annotations in your RunInput class to a JSON schema and use that schema in the UI to do client-side validation. However, custom validation requires running logic defined in your RunInput class. This happens after the flow resumes, so you'll probably want to handle it explicitly in your flow. Continue reading for an example best practice.

The following is an example RunInput class that uses a custom field validator:

import pydantic
from prefect.input import RunInput


class ShirtOrder(RunInput):
    size: Literal["small", "medium", "large", "xlarge"]
    color: Literal["red", "green", "black"]

    @pydantic.validator("color")
    def validate_age(cls, value, values, **kwargs):
        if value == "green" and values["size"] == "small":
            raise ValueError("Green is only in-stock for medium, large, and XL sizes.")

        return value

In the example, we use Pydantic's validator decorator to define a custom validation method for the color field. We can use it in a flow like this:

import pydantic
from prefect import flow, pause_flow_run
from prefect.input import RunInput


class ShirtOrder(RunInput):
    size: Literal["small", "medium", "large", "xlarge"]
    color: Literal["red", "green", "black"]

    @pydantic.validator("color")
    def validate_age(cls, value, values, **kwargs):
        if value == "green" and values["size"] == "small":
            raise ValueError("Green is only in-stock for medium, large, and XL sizes.")

        return value


@flow
def get_shirt_order():
    shirt_order = pause_flow_run(wait_for_input=ShirtOrder)

If a user chooses any size and color combination other than small and green, the flow run will resume successfully. However, if the user chooses size small and color green, the flow run will resume, and pause_flow_run will raise a ValidationError exception. This will cause the flow run to fail and log the error.

However, what if you don't want the flow run to fail? One way to handle this case is to use a while loop and pause again if the ValidationError exception is raised:

from typing import Literal

import pydantic
from prefect import flow, get_run_logger, pause_flow_run
from prefect.input import RunInput


class ShirtOrder(RunInput):
    size: Literal["small", "medium", "large", "xlarge"]
    color: Literal["red", "green", "black"]

    @pydantic.validator("color")
    def validate_age(cls, value, values, **kwargs):
        if value == "green" and values["size"] == "small":
            raise ValueError("Green is only in-stock for medium, large, and XL sizes.")

        return value


@flow
def get_shirt_order():
    logger = get_run_logger()
    shirt_order = None

    while shirt_order is None:
        try:
            shirt_order = pause_flow_run(wait_for_input=ShirtOrder)
        except pydantic.ValidationError as exc:
            logger.error(f"Invalid size and color combination: {exc}")

    logger.info(f"Shirt order: {shirt_order.size}, {shirt_order.color}")

This code will cause the flow run to continually pause until the user enters a valid age.

As an additional step, you may want to use an automation or notification to alert the user to the error.