Skip to content

prefect.deployments.steps.core

Core primitives for running Prefect deployment steps.

Deployment steps are YAML representations of Python functions along with their inputs.

Whenever a step is run, the following actions are taken:

  • The step's inputs and block / variable references are resolved (see the prefect deploy documentation for more details)
  • The step's function is imported; if it cannot be found, the requires keyword is used to install the necessary packages
  • The step's function is called with the resolved inputs
  • The step's output is returned and used to resolve inputs for subsequent steps

StepExecutionError

Bases: Exception

Raised when a step fails to execute.

Source code in prefect/deployments/steps/core.py
37
38
39
40
class StepExecutionError(Exception):
    """
    Raised when a step fails to execute.
    """

run_step async

Runs a step, returns the step's output.

Steps are assumed to be in the format {"importable.func.name": {"kwarg1": "value1", ...}}.

The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the inputs before passing to the step function:

This keyword is used to specify packages that should be installed before running the step.

Source code in prefect/deployments/steps/core.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
async def run_step(step: Dict, upstream_outputs: Optional[Dict] = None) -> Dict:
    """
    Runs a step, returns the step's output.

    Steps are assumed to be in the format `{"importable.func.name": {"kwarg1": "value1", ...}}`.

    The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the
    inputs before passing to the step function:

    This keyword is used to specify packages that should be installed before running the step.
    """
    fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
    upstream_outputs = upstream_outputs or {}

    if len(step.keys()) > 1:
        raise ValueError(
            f"Step has unexpected additional keys: {', '.join(list(step.keys())[1:])}"
        )

    keywords = {
        keyword: inputs.pop(keyword)
        for keyword in RESERVED_KEYWORDS
        if keyword in inputs
    }

    inputs = apply_values(inputs, upstream_outputs)
    inputs = await resolve_block_document_references(inputs)
    inputs = await resolve_variables(inputs)
    inputs = apply_values(inputs, os.environ)
    step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
    result = await from_async.call_soon_in_new_thread(
        Call.new(step_func, **inputs)
    ).aresult()
    return result