Skip to content

prefect.workers.block

BlockWorkerJobConfiguration

Bases: BaseModel

Source code in prefect/workers/block.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@deprecated_class(
    start_date="Jun 2024",
    help="Refer to the upgrade guide for more information: https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/",
)
class BlockWorkerJobConfiguration(BaseModel):
    block: Block = Field(
        default=..., description="The infrastructure block to use for job creation."
    )

    @validator("block")
    def _validate_infrastructure_block(cls, v):
        return validate_block_is_infrastructure(v)

    _related_objects: Dict[str, Any] = PrivateAttr(default_factory=dict)

    @property
    def is_using_a_runner(self):
        return (
            self.block.command is not None
            and "prefect flow-run execute" in shlex.join(self.block.command)
        )

    @staticmethod
    def _get_base_config_defaults(variables: dict) -> dict:
        """Get default values from base config for all variables that have them."""
        defaults = dict()
        for variable_name, attrs in variables.items():
            if "default" in attrs:
                defaults[variable_name] = attrs["default"]

        return defaults

    @classmethod
    @inject_client
    async def from_template_and_values(
        cls, base_job_template: dict, values: dict, client: "PrefectClient" = None
    ):
        """Creates a valid worker configuration object from the provided base
        configuration and overrides.

        Important: this method expects that the base_job_template was already
        validated server-side.
        """
        job_config: Dict[str, Any] = base_job_template["job_configuration"]
        variables_schema = base_job_template["variables"]
        variables = cls._get_base_config_defaults(
            variables_schema.get("properties", {})
        )
        variables.update(values)

        populated_configuration = apply_values(template=job_config, values=variables)

        block_document_id = get_from_dict(
            populated_configuration, "block.$ref.block_document_id"
        )
        if not block_document_id:
            raise ValueError(
                "Base job template is invalid for this worker type because it does not"
                " contain a block_document_id after variable resolution."
            )

        block_document = await client.read_block_document(
            block_document_id=block_document_id
        )
        infrastructure_block = Block._from_block_document(block_document)

        populated_configuration["block"] = infrastructure_block

        return cls(**populated_configuration)

    @classmethod
    def json_template(cls) -> dict:
        """Returns a dict with job configuration as keys and the corresponding templates as values

        Defaults to using the job configuration parameter name as the template variable name.

        e.g.
        {
            key1: '{{ key1 }}',     # default variable template
            key2: '{{ template2 }}', # `template2` specifically provide as template
        }
        """
        configuration = {}
        properties = cls.schema()["properties"]
        for k, v in properties.items():
            if v.get("template"):
                template = v["template"]
            else:
                template = "{{ " + k + " }}"
            configuration[k] = template

        return configuration

    def _related_resources(self) -> List[RelatedResource]:
        tags = set()
        related = []

        for kind, obj in self._related_objects.items():
            if obj is None:
                continue
            if hasattr(obj, "tags"):
                tags.update(obj.tags)
            related.append(object_as_related_resource(kind=kind, role=kind, object=obj))

        return related + tags_as_related_resources(tags)

    def prepare_for_flow_run(
        self,
        flow_run: "FlowRun",
        deployment: Optional["DeploymentResponse"] = None,
        flow: Optional["Flow"] = None,
    ):
        self.block = self.block.prepare_for_flow_run(
            flow_run=flow_run, deployment=deployment, flow=flow
        )

from_template_and_values async classmethod

Creates a valid worker configuration object from the provided base configuration and overrides.

Important: this method expects that the base_job_template was already validated server-side.

Source code in prefect/workers/block.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
@inject_client
async def from_template_and_values(
    cls, base_job_template: dict, values: dict, client: "PrefectClient" = None
):
    """Creates a valid worker configuration object from the provided base
    configuration and overrides.

    Important: this method expects that the base_job_template was already
    validated server-side.
    """
    job_config: Dict[str, Any] = base_job_template["job_configuration"]
    variables_schema = base_job_template["variables"]
    variables = cls._get_base_config_defaults(
        variables_schema.get("properties", {})
    )
    variables.update(values)

    populated_configuration = apply_values(template=job_config, values=variables)

    block_document_id = get_from_dict(
        populated_configuration, "block.$ref.block_document_id"
    )
    if not block_document_id:
        raise ValueError(
            "Base job template is invalid for this worker type because it does not"
            " contain a block_document_id after variable resolution."
        )

    block_document = await client.read_block_document(
        block_document_id=block_document_id
    )
    infrastructure_block = Block._from_block_document(block_document)

    populated_configuration["block"] = infrastructure_block

    return cls(**populated_configuration)

json_template classmethod

Returns a dict with job configuration as keys and the corresponding templates as values

Defaults to using the job configuration parameter name as the template variable name.

e.g. { key1: '{{ key1 }}', # default variable template key2: '{{ template2 }}', # template2 specifically provide as template }

Source code in prefect/workers/block.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@classmethod
def json_template(cls) -> dict:
    """Returns a dict with job configuration as keys and the corresponding templates as values

    Defaults to using the job configuration parameter name as the template variable name.

    e.g.
    {
        key1: '{{ key1 }}',     # default variable template
        key2: '{{ template2 }}', # `template2` specifically provide as template
    }
    """
    configuration = {}
    properties = cls.schema()["properties"]
    for k, v in properties.items():
        if v.get("template"):
            template = v["template"]
        else:
            template = "{{ " + k + " }}"
        configuration[k] = template

    return configuration

BlockWorkerResult

Bases: BaseWorkerResult

Result of a block worker job

Source code in prefect/workers/block.py
148
149
class BlockWorkerResult(BaseWorkerResult):
    """Result of a block worker job"""