Skip to content

prefect.input.run_input

RunInput

Bases: BaseModel

Source code in prefect/input/run_input.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class RunInput(pydantic.BaseModel):
    class Config:
        extra = "forbid"

    @classmethod
    @sync_compatible
    async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
        """
        Save the run input response to the given key.

        Args:
            - keyset (Keyset): the keyset to save the input for
            - flow_run_id (UUID, optional): the flow run ID to save the input for
        """

        if HAS_PYDANTIC_V2:
            schema = create_v2_schema(cls.__name__, model_base=cls)
        else:
            schema = cls.schema(by_alias=True)

        await create_flow_run_input(
            key=keyset["schema"], value=schema, flow_run_id=flow_run_id
        )

    @classmethod
    @sync_compatible
    async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
        """
        Load the run input response from the given key.

        Args:
            - keyset (Keyset): the keyset to load the input for
            - flow_run_id (UUID, optional): the flow run ID to load the input for
        """
        value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
        return cls(**value)

    @classmethod
    def with_initial_data(cls: Type[T], **kwargs: Any) -> Type[T]:
        """
        Create a new `RunInput` subclass with the given initial data as field
        defaults.

        Args:
            - kwargs (Any): the initial data
        """
        fields = {}
        for key, value in kwargs.items():
            fields[key] = (type(value), value)
        return pydantic.create_model(cls.__name__, **fields, __base__=cls)

load async classmethod

Load the run input response from the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to load the input for

required
- flow_run_id (UUID

the flow run ID to load the input for

required
Source code in prefect/input/run_input.py
79
80
81
82
83
84
85
86
87
88
89
90
@classmethod
@sync_compatible
async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
    """
    Load the run input response from the given key.

    Args:
        - keyset (Keyset): the keyset to load the input for
        - flow_run_id (UUID, optional): the flow run ID to load the input for
    """
    value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
    return cls(**value)

save async classmethod

Save the run input response to the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to save the input for

required
- flow_run_id (UUID

the flow run ID to save the input for

required
Source code in prefect/input/run_input.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
@sync_compatible
async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
    """
    Save the run input response to the given key.

    Args:
        - keyset (Keyset): the keyset to save the input for
        - flow_run_id (UUID, optional): the flow run ID to save the input for
    """

    if HAS_PYDANTIC_V2:
        schema = create_v2_schema(cls.__name__, model_base=cls)
    else:
        schema = cls.schema(by_alias=True)

    await create_flow_run_input(
        key=keyset["schema"], value=schema, flow_run_id=flow_run_id
    )

with_initial_data classmethod

Create a new RunInput subclass with the given initial data as field defaults.

Parameters:

Name Type Description Default
- kwargs (Any

the initial data

required
Source code in prefect/input/run_input.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
def with_initial_data(cls: Type[T], **kwargs: Any) -> Type[T]:
    """
    Create a new `RunInput` subclass with the given initial data as field
    defaults.

    Args:
        - kwargs (Any): the initial data
    """
    fields = {}
    for key, value in kwargs.items():
        fields[key] = (type(value), value)
    return pydantic.create_model(cls.__name__, **fields, __base__=cls)

keyset_from_base_key

Get the keyset for the given base key.

Parameters:

Name Type Description Default
- base_key (str

the base key to get the keyset for

required

Returns:

Type Description
Keyset
  • Dict[str, str]: the keyset
Source code in prefect/input/run_input.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def keyset_from_base_key(base_key: str) -> Keyset:
    """
    Get the keyset for the given base key.

    Args:
        - base_key (str): the base key to get the keyset for

    Returns:
        - Dict[str, str]: the keyset
    """
    return {
        "response": f"{base_key}-response",
        "schema": f"{base_key}-schema",
    }

keyset_from_paused_state

Get the keyset for the given Paused state.

Parameters:

Name Type Description Default
- state (State

the state to get the keyset for

required
Source code in prefect/input/run_input.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def keyset_from_paused_state(state: "State") -> Keyset:
    """
    Get the keyset for the given Paused state.

    Args:
        - state (State): the state to get the keyset for
    """

    if not state.is_paused():
        raise RuntimeError(f"{state.type.value!r} is unsupported.")

    return keyset_from_base_key(
        f"{state.name.lower()}-{str(state.state_details.pause_key)}"
    )