Skip to content

prefect.events

TriggerTypes: TypeAlias = Union[EventTrigger, MetricTrigger, CompoundTrigger, SequenceTrigger] module-attribute

The union of all concrete trigger types that a user may actually create

Action

Bases: PrefectBaseModel, ABC

An Action that may be performed when an Automation is triggered

Source code in prefect/events/actions.py
18
19
20
21
22
23
24
25
class Action(PrefectBaseModel, abc.ABC):
    """An Action that may be performed when an Automation is triggered"""

    type: str

    def describe_for_cli(self) -> str:
        """A human-readable description of the action"""
        return self.type.replace("-", " ").capitalize()

describe_for_cli

A human-readable description of the action

Source code in prefect/events/actions.py
23
24
25
def describe_for_cli(self) -> str:
    """A human-readable description of the action"""
    return self.type.replace("-", " ").capitalize()

AutomationCore

Bases: PrefectBaseModel

Defines an action a user wants to take when a certain number of events do or don't happen to the matching resources

Source code in prefect/events/schemas/automations.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class AutomationCore(PrefectBaseModel, extra="ignore"):  # type: ignore[call-arg]
    """Defines an action a user wants to take when a certain number of events
    do or don't happen to the matching resources"""

    name: str = Field(..., description="The name of this automation")
    description: str = Field("", description="A longer description of this automation")

    enabled: bool = Field(True, description="Whether this automation will be evaluated")

    trigger: TriggerTypes = Field(
        ...,
        description=(
            "The criteria for which events this Automation covers and how it will "
            "respond to the presence or absence of those events"
        ),
    )

    actions: List[ActionTypes] = Field(
        ...,
        description="The actions to perform when this Automation triggers",
    )

    actions_on_trigger: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a triggered state",
    )

    actions_on_resolve: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a resolving state",
    )

    owner_resource: Optional[str] = Field(
        default=None, description="The owning resource of this automation"
    )

CallWebhook

Bases: Action

Call a webhook when an Automation is triggered.

Source code in prefect/events/actions.py
128
129
130
131
132
133
134
135
136
137
138
class CallWebhook(Action):
    """Call a webhook when an Automation is triggered."""

    type: Literal["call-webhook"] = "call-webhook"
    block_document_id: UUID = Field(
        description="The identifier of the webhook block to use"
    )
    payload: str = Field(
        default="",
        description="An optional templatable payload to send when calling the webhook.",
    )

CancelFlowRun

Bases: Action

Cancels a flow run associated with the trigger

Source code in prefect/events/actions.py
116
117
118
119
class CancelFlowRun(Action):
    """Cancels a flow run associated with the trigger"""

    type: Literal["cancel-flow-run"] = "cancel-flow-run"

ChangeFlowRunState

Bases: Action

Changes the state of a flow run associated with the trigger

Source code in prefect/events/actions.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ChangeFlowRunState(Action):
    """Changes the state of a flow run associated with the trigger"""

    type: Literal["change-flow-run-state"] = "change-flow-run-state"

    name: Optional[str] = Field(
        None,
        description="The name of the state to change the flow run to",
    )
    state: StateType = Field(
        ...,
        description="The type of the state to change the flow run to",
    )
    message: Optional[str] = Field(
        None,
        description="An optional message to associate with the state change",
    )

CompositeTrigger

Bases: Trigger, ABC

Requires some number of triggers to have fired within the given time period.

Source code in prefect/events/schemas/automations.py
312
313
314
315
316
317
318
319
class CompositeTrigger(Trigger, abc.ABC):
    """
    Requires some number of triggers to have fired within the given time period.
    """

    type: Literal["compound", "sequence"]
    triggers: List["TriggerTypes"]
    within: Optional[timedelta]

CompoundTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period

Source code in prefect/events/schemas/automations.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
class CompoundTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have
    fired within the given time period"""

    type: Literal["compound"] = "compound"
    require: Union[int, Literal["any", "all"]]

    @root_validator
    def validate_require(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        require = values.get("require")

        if isinstance(require, int):
            if require < 1:
                raise ValueError("required must be at least 1")
            if require > len(values["triggers"]):
                raise ValueError(
                    "required must be less than or equal to the number of triggers"
                )

        return values

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    f"{str(self.require).capitalize()} of:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                f"{str(self.require).capitalize()} of:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

DeclareIncident

Bases: Action

Declares an incident for the triggering event. Only available on Prefect Cloud

Source code in prefect/events/actions.py
268
269
270
271
class DeclareIncident(Action):
    """Declares an incident for the triggering event.  Only available on Prefect Cloud"""

    type: Literal["declare-incident"] = "declare-incident"

DeploymentCompoundTrigger

Bases: BaseDeploymentTrigger, CompoundTrigger

A composite trigger that requires some number of triggers to have fired within the given time period

Source code in prefect/events/schemas/deployment_triggers.py
100
101
102
103
104
class DeploymentCompoundTrigger(BaseDeploymentTrigger, CompoundTrigger):
    """A composite trigger that requires some number of triggers to have
    fired within the given time period"""

    trigger_type = CompoundTrigger

DeploymentEventTrigger

Bases: BaseDeploymentTrigger, EventTrigger

A trigger that fires based on the presence or absence of events within a given period of time.

Source code in prefect/events/schemas/deployment_triggers.py
83
84
85
86
87
88
89
class DeploymentEventTrigger(BaseDeploymentTrigger, EventTrigger):
    """
    A trigger that fires based on the presence or absence of events within a given
    period of time.
    """

    trigger_type = EventTrigger

DeploymentMetricTrigger

Bases: BaseDeploymentTrigger, MetricTrigger

A trigger that fires based on the results of a metric query.

Source code in prefect/events/schemas/deployment_triggers.py
92
93
94
95
96
97
class DeploymentMetricTrigger(BaseDeploymentTrigger, MetricTrigger):
    """
    A trigger that fires based on the results of a metric query.
    """

    trigger_type = MetricTrigger

DeploymentSequenceTrigger

Bases: BaseDeploymentTrigger, SequenceTrigger

A composite trigger that requires some number of triggers to have fired within the given time period in a specific order

Source code in prefect/events/schemas/deployment_triggers.py
107
108
109
110
111
class DeploymentSequenceTrigger(BaseDeploymentTrigger, SequenceTrigger):
    """A composite trigger that requires some number of triggers to have fired
    within the given time period in a specific order"""

    trigger_type = SequenceTrigger

DoNothing

Bases: Action

Do nothing when an Automation is triggered

Source code in prefect/events/actions.py
28
29
30
31
class DoNothing(Action):
    """Do nothing when an Automation is triggered"""

    type: Literal["do-nothing"] = "do-nothing"

Event

Bases: PrefectBaseModel

The client-side view of an event that has happened to a Resource

Source code in prefect/events/schemas/events.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class Event(PrefectBaseModel):
    """The client-side view of an event that has happened to a Resource"""

    occurred: DateTimeTZ = Field(
        default_factory=lambda: pendulum.now("UTC"),
        description="When the event happened from the sender's perspective",
    )
    event: str = Field(
        description="The name of the event that happened",
    )
    resource: Resource = Field(
        description="The primary Resource this event concerns",
    )
    related: List[RelatedResource] = Field(
        default_factory=list,
        description="A list of additional Resources involved in this event",
    )
    payload: Dict[str, Any] = Field(
        default_factory=dict,
        description="An open-ended set of data describing what happened",
    )
    id: UUID = Field(
        default_factory=uuid4,
        description="The client-provided identifier of this event",
    )
    follows: Optional[UUID] = Field(
        default=None,
        description=(
            "The ID of an event that is known to have occurred prior to this one. "
            "If set, this may be used to establish a more precise ordering of causally-"
            "related events when they occur close enough together in time that the "
            "system may receive them out-of-order."
        ),
    )

    @property
    def involved_resources(self) -> Sequence[Resource]:
        return [self.resource] + list(self.related)

    @property
    def resource_in_role(self) -> Mapping[str, RelatedResource]:
        """Returns a mapping of roles to the first related resource in that role"""
        return {related.role: related for related in reversed(self.related)}

    @property
    def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]:
        """Returns a mapping of roles to related resources in that role"""
        resources: Dict[str, List[RelatedResource]] = defaultdict(list)
        for related in self.related:
            resources[related.role].append(related)
        return resources

    @validator("related")
    def enforce_maximum_related_resources(cls, value: List[RelatedResource]):
        if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value():
            raise ValueError(
                "The maximum number of related resources "
                f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}"
            )

        return value

    def find_resource_label(self, label: str) -> Optional[str]:
        """Finds the value of the given label in this event's resource or one of its
        related resources.  If the label starts with `related:<role>:`, search for the
        first matching label in a related resource with that role."""
        directive, _, related_label = label.rpartition(":")
        directive, _, role = directive.partition(":")
        if directive == "related":
            for related in self.related:
                if related.role == role:
                    return related.get(related_label)
        return self.resource.get(label)

resource_in_role: Mapping[str, RelatedResource] property

Returns a mapping of roles to the first related resource in that role

resources_in_role: Mapping[str, Sequence[RelatedResource]] property

Returns a mapping of roles to related resources in that role

find_resource_label

Finds the value of the given label in this event's resource or one of its related resources. If the label starts with related:<role>:, search for the first matching label in a related resource with that role.

Source code in prefect/events/schemas/events.py
167
168
169
170
171
172
173
174
175
176
177
def find_resource_label(self, label: str) -> Optional[str]:
    """Finds the value of the given label in this event's resource or one of its
    related resources.  If the label starts with `related:<role>:`, search for the
    first matching label in a related resource with that role."""
    directive, _, related_label = label.rpartition(":")
    directive, _, role = directive.partition(":")
    if directive == "related":
        for related in self.related:
            if related.role == role:
                return related.get(related_label)
    return self.resource.get(label)

EventTrigger

Bases: ResourceTrigger

A trigger that fires based on the presence or absence of events within a given period of time.

Source code in prefect/events/schemas/automations.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class EventTrigger(ResourceTrigger):
    """
    A trigger that fires based on the presence or absence of events within a given
    period of time.
    """

    type: Literal["event"] = "event"

    after: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) which must first been seen to fire this trigger.  If "
            "empty, then fire this trigger immediately.  Events may include "
            "trailing wildcards, like `prefect.flow-run.*`"
        ),
    )
    expect: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) this trigger is expecting to see.  If empty, this "
            "trigger will match any event.  Events may include trailing wildcards, "
            "like `prefect.flow-run.*`"
        ),
    )

    for_each: Set[str] = Field(
        default_factory=set,
        description=(
            "Evaluate the trigger separately for each distinct value of these labels "
            "on the resource.  By default, labels refer to the primary resource of the "
            "triggering event.  You may also refer to labels from related "
            "resources by specifying `related:<role>:<label>`.  This will use the "
            "value of that label for the first related resource in that role.  For "
            'example, `"for_each": ["related:flow:prefect.resource.id"]` would '
            "evaluate the trigger for each flow."
        ),
    )
    posture: Literal[Posture.Reactive, Posture.Proactive] = Field(  # type: ignore[valid-type]
        Posture.Reactive,
        description=(
            "The posture of this trigger, either Reactive or Proactive.  Reactive "
            "triggers respond to the _presence_ of the expected events, while "
            "Proactive triggers respond to the _absence_ of those expected events."
        ),
    )
    threshold: int = Field(
        1,
        description=(
            "The number of events required for this trigger to fire (for "
            "Reactive triggers), or the number of events expected (for Proactive "
            "triggers)"
        ),
    )
    within: timedelta = Field(
        timedelta(0),
        minimum=0.0,
        exclusiveMinimum=False,
        description=(
            "The time period over which the events must occur.  For Reactive triggers, "
            "this may be as low as 0 seconds, but must be at least 10 seconds for "
            "Proactive triggers"
        ),
    )

    @validator("within")
    def enforce_minimum_within(
        cls, value: timedelta, values, config, field: ModelField
    ):
        return validate_trigger_within(value, field)

    @root_validator(skip_on_failure=True)
    def enforce_minimum_within_for_proactive_triggers(cls, values: Dict[str, Any]):
        posture: Optional[Posture] = values.get("posture")
        within: Optional[timedelta] = values.get("within")

        if posture == Posture.Proactive:
            if not within or within == timedelta(0):
                values["within"] = timedelta(seconds=10.0)
            elif within < timedelta(seconds=10.0):
                raise ValueError(
                    "The minimum within for Proactive triggers is 10 seconds"
                )

        return values

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        if self.posture == Posture.Reactive:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Reactive: expecting {self.threshold} of {self.expect}",
                    ],
                ),
                prefix="  " * indent,
            )
        else:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Proactive: expecting {self.threshold} {self.expect} event "
                        f"within {self.within}",
                    ],
                ),
                prefix="  " * indent,
            )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    if self.posture == Posture.Reactive:
        return textwrap.indent(
            "\n".join(
                [
                    f"Reactive: expecting {self.threshold} of {self.expect}",
                ],
            ),
            prefix="  " * indent,
        )
    else:
        return textwrap.indent(
            "\n".join(
                [
                    f"Proactive: expecting {self.threshold} {self.expect} event "
                    f"within {self.within}",
                ],
            ),
            prefix="  " * indent,
        )

MetricTrigger

Bases: ResourceTrigger

A trigger that fires based on the results of a metric query.

Source code in prefect/events/schemas/automations.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class MetricTrigger(ResourceTrigger):
    """
    A trigger that fires based on the results of a metric query.
    """

    type: Literal["metric"] = "metric"

    posture: Literal[Posture.Metric] = Field(  # type: ignore[valid-type]
        Posture.Metric,
        description="Periodically evaluate the configured metric query.",
    )

    metric: MetricTriggerQuery = Field(
        ...,
        description="The metric query to evaluate for this trigger. ",
    )

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        m = self.metric
        return textwrap.indent(
            "\n".join(
                [
                    f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
299
300
301
302
303
304
305
306
307
308
309
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    m = self.metric
    return textwrap.indent(
        "\n".join(
            [
                f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
            ]
        ),
        prefix="  " * indent,
    )

MetricTriggerQuery

Bases: PrefectBaseModel

Defines a subset of the Trigger subclass, which is specific to Metric automations, that specify the query configurations and breaching conditions for the Automation

Source code in prefect/events/schemas/automations.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class MetricTriggerQuery(PrefectBaseModel):
    """Defines a subset of the Trigger subclass, which is specific
    to Metric automations, that specify the query configurations
    and breaching conditions for the Automation"""

    name: PrefectMetric = Field(
        ...,
        description="The name of the metric to query.",
    )
    threshold: float = Field(
        ...,
        description=(
            "The threshold value against which we'll compare " "the query result."
        ),
    )
    operator: MetricTriggerOperator = Field(
        ...,
        description=(
            "The comparative operator (LT / LTE / GT / GTE) used to compare "
            "the query result against the threshold value."
        ),
    )
    range: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        minimum=300.0,
        exclusiveMinimum=False,
        description=(
            "The lookback duration (seconds) for a metric query. This duration is "
            "used to determine the time range over which the query will be executed. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )
    firing_for: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        minimum=300.0,
        exclusiveMinimum=False,
        description=(
            "The duration (seconds) for which the metric query must breach "
            "or resolve continuously before the state is updated and the "
            "automation is triggered. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )

PauseAutomation

Bases: AutomationAction

Pauses a Work Queue

Source code in prefect/events/actions.py
256
257
258
259
class PauseAutomation(AutomationAction):
    """Pauses a Work Queue"""

    type: Literal["pause-automation"] = "pause-automation"

PauseDeployment

Bases: DeploymentAction

Pauses the given Deployment

Source code in prefect/events/actions.py
85
86
87
88
class PauseDeployment(DeploymentAction):
    """Pauses the given Deployment"""

    type: Literal["pause-deployment"] = "pause-deployment"

PauseWorkPool

Bases: WorkPoolAction

Pauses a Work Pool

Source code in prefect/events/actions.py
172
173
174
175
class PauseWorkPool(WorkPoolAction):
    """Pauses a Work Pool"""

    type: Literal["pause-work-pool"] = "pause-work-pool"

PauseWorkQueue

Bases: WorkQueueAction

Pauses a Work Queue

Source code in prefect/events/actions.py
214
215
216
217
class PauseWorkQueue(WorkQueueAction):
    """Pauses a Work Queue"""

    type: Literal["pause-work-queue"] = "pause-work-queue"

ReceivedEvent

Bases: Event

The server-side view of an event that has happened to a Resource after it has been received by the server

Source code in prefect/events/schemas/events.py
180
181
182
183
184
185
186
187
188
189
190
class ReceivedEvent(Event):
    """The server-side view of an event that has happened to a Resource after it has
    been received by the server"""

    class Config:
        orm_mode = True

    received: DateTimeTZ = Field(
        ...,
        description="When the event was received by Prefect Cloud",
    )

RelatedResource

Bases: Resource

A Resource with a specific role in an Event

Source code in prefect/events/schemas/events.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class RelatedResource(Resource):
    """A Resource with a specific role in an Event"""

    @root_validator(pre=True)
    def requires_resource_role(cls, values: Dict[str, Any]):
        labels = values.get("__root__")
        if not isinstance(labels, dict):
            return values

        labels = cast(Dict[str, str], labels)

        if "prefect.resource.role" not in labels:
            raise ValueError(
                "Related Resources must include the prefect.resource.role label"
            )
        if not labels["prefect.resource.role"]:
            raise ValueError("The prefect.resource.role label must be non-empty")

        return values

    @property
    def role(self) -> str:
        return self["prefect.resource.role"]

Resource

Bases: Labelled

An observable business object of interest to the user

Source code in prefect/events/schemas/events.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Resource(Labelled):
    """An observable business object of interest to the user"""

    @root_validator(pre=True)
    def enforce_maximum_labels(cls, values: Dict[str, Any]):
        labels = values.get("__root__")
        if not isinstance(labels, dict):
            return values

        if len(labels) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value():
            raise ValueError(
                "The maximum number of labels per resource "
                f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}"
            )

        return values

    @root_validator(pre=True)
    def requires_resource_id(cls, values: Dict[str, Any]):
        labels = values.get("__root__")
        if not isinstance(labels, dict):
            return values

        labels = cast(Dict[str, str], labels)

        if "prefect.resource.id" not in labels:
            raise ValueError("Resources must include the prefect.resource.id label")
        if not labels["prefect.resource.id"]:
            raise ValueError("The prefect.resource.id label must be non-empty")

        return values

    @property
    def id(self) -> str:
        return self["prefect.resource.id"]

    @property
    def name(self) -> Optional[str]:
        return self.get("prefect.resource.name")

ResourceSpecification

Bases: PrefectBaseModel

A specification that may match zero, one, or many resources, used to target or select a set of resources in a query or automation. A resource must match at least one value of all of the provided labels

Source code in prefect/events/schemas/events.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class ResourceSpecification(PrefectBaseModel):
    """A specification that may match zero, one, or many resources, used to target or
    select a set of resources in a query or automation.  A resource must match at least
    one value of all of the provided labels"""

    __root__: Dict[str, Union[str, List[str]]]

    def matches_every_resource(self) -> bool:
        return len(self) == 0

    def matches_every_resource_of_kind(self, prefix: str) -> bool:
        if self.matches_every_resource():
            return True

        if len(self.__root__) == 1:
            if resource_id := self.__root__.get("prefect.resource.id"):
                values = [resource_id] if isinstance(resource_id, str) else resource_id
                return any(value == f"{prefix}.*" for value in values)

        return False

    def includes(self, candidates: Iterable[Resource]) -> bool:
        if self.matches_every_resource():
            return True

        for candidate in candidates:
            if self.matches(candidate):
                return True

        return False

    def matches(self, resource: Resource) -> bool:
        for label, expected in self.items():
            value = resource.get(label)
            if not any(matches(candidate, value) for candidate in expected):
                return False
        return True

    def items(self) -> Iterable[Tuple[str, List[str]]]:
        return [
            (label, [value] if isinstance(value, str) else value)
            for label, value in self.__root__.items()
        ]

    def __contains__(self, key: str) -> bool:
        return self.__root__.__contains__(key)

    def __getitem__(self, key: str) -> List[str]:
        value = self.__root__[key]
        if not value:
            return []
        if not isinstance(value, list):
            value = [value]
        return value

    def pop(
        self, key: str, default: Optional[Union[str, List[str]]] = None
    ) -> Optional[List[str]]:
        value = self.__root__.pop(key, default)
        if not value:
            return []
        if not isinstance(value, list):
            value = [value]
        return value

    def get(
        self, key: str, default: Optional[Union[str, List[str]]] = None
    ) -> Optional[List[str]]:
        value = self.__root__.get(key, default)
        if not value:
            return []
        if not isinstance(value, list):
            value = [value]
        return value

    def __len__(self) -> int:
        return len(self.__root__)

    def deepcopy(self) -> "ResourceSpecification":
        return ResourceSpecification.parse_obj(copy.deepcopy(self.__root__))

ResourceTrigger

Bases: Trigger, ABC

Base class for triggers that may filter by the labels of resources.

Source code in prefect/events/schemas/automations.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ResourceTrigger(Trigger, abc.ABC):
    """
    Base class for triggers that may filter by the labels of resources.
    """

    type: str

    match: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.parse_obj({}),
        description="Labels for resources which this trigger will match.",
    )
    match_related: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.parse_obj({}),
        description="Labels for related resources which this trigger will match.",
    )

ResumeAutomation

Bases: AutomationAction

Resumes a Work Queue

Source code in prefect/events/actions.py
262
263
264
265
class ResumeAutomation(AutomationAction):
    """Resumes a Work Queue"""

    type: Literal["resume-automation"] = "resume-automation"

ResumeDeployment

Bases: DeploymentAction

Resumes the given Deployment

Source code in prefect/events/actions.py
91
92
93
94
class ResumeDeployment(DeploymentAction):
    """Resumes the given Deployment"""

    type: Literal["resume-deployment"] = "resume-deployment"

ResumeWorkPool

Bases: WorkPoolAction

Resumes a Work Pool

Source code in prefect/events/actions.py
178
179
180
181
class ResumeWorkPool(WorkPoolAction):
    """Resumes a Work Pool"""

    type: Literal["resume-work-pool"] = "resume-work-pool"

ResumeWorkQueue

Bases: WorkQueueAction

Resumes a Work Queue

Source code in prefect/events/actions.py
220
221
222
223
class ResumeWorkQueue(WorkQueueAction):
    """Resumes a Work Queue"""

    type: Literal["resume-work-queue"] = "resume-work-queue"

RunDeployment

Bases: DeploymentAction

Runs the given deployment with the given parameters

Source code in prefect/events/actions.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class RunDeployment(DeploymentAction):
    """Runs the given deployment with the given parameters"""

    type: Literal["run-deployment"] = "run-deployment"

    parameters: Optional[Dict[str, Any]] = Field(
        None,
        description=(
            "The parameters to pass to the deployment, or None to use the "
            "deployment's default parameters"
        ),
    )
    job_variables: Optional[Dict[str, Any]] = Field(
        None,
        description=(
            "The job variables to pass to the created flow run, or None "
            "to use the deployment's default job variables"
        ),
    )

SendNotification

Bases: Action

Send a notification when an Automation is triggered

Source code in prefect/events/actions.py
141
142
143
144
145
146
147
148
149
class SendNotification(Action):
    """Send a notification when an Automation is triggered"""

    type: Literal["send-notification"] = "send-notification"
    block_document_id: UUID = Field(
        description="The identifier of the notification block to use"
    )
    subject: str = Field("Prefect automated notification")
    body: str = Field(description="The text of the notification to send")

SequenceTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period in a specific order

Source code in prefect/events/schemas/automations.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class SequenceTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have fired
    within the given time period in a specific order"""

    type: Literal["sequence"] = "sequence"

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    "In this order:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                "In this order:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

SuspendFlowRun

Bases: Action

Suspends a flow run associated with the trigger

Source code in prefect/events/actions.py
122
123
124
125
class SuspendFlowRun(Action):
    """Suspends a flow run associated with the trigger"""

    type: Literal["suspend-flow-run"] = "suspend-flow-run"

Trigger

Bases: PrefectBaseModel, ABC

Base class describing a set of criteria that must be satisfied in order to trigger an automation.

Source code in prefect/events/schemas/automations.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"):  # type: ignore[call-arg]
    """
    Base class describing a set of criteria that must be satisfied in order to trigger
    an automation.
    """

    type: str

    @abc.abstractmethod
    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""

    # The following allows the regular Trigger class to be used when serving or
    # deploying flows, analogous to how the Deployment*Trigger classes work

    _deployment_id: Optional[UUID] = PrivateAttr(default=None)

    def set_deployment_id(self, deployment_id: UUID):
        self._deployment_id = deployment_id

    def owner_resource(self) -> Optional[str]:
        return f"prefect.deployment.{self._deployment_id}"

    def actions(self) -> List[ActionTypes]:
        assert self._deployment_id
        return [
            RunDeployment(
                source="selected",
                deployment_id=self._deployment_id,
                parameters=getattr(self, "parameters", None),
                job_variables=getattr(self, "job_variables", None),
            )
        ]

    def as_automation(self) -> "AutomationCore":
        assert self._deployment_id

        trigger: TriggerTypes = cast(TriggerTypes, self)

        # This is one of the Deployment*Trigger classes, so translate it over to a
        # plain Trigger
        if hasattr(self, "trigger_type"):
            trigger = self.trigger_type(**self.dict())

        return AutomationCore(
            name=(
                getattr(self, "name", None)
                or f"Automation for deployment {self._deployment_id}"
            ),
            description="",
            enabled=getattr(self, "enabled", True),
            trigger=trigger,
            actions=self.actions(),
            owner_resource=self.owner_resource(),
        )

describe_for_cli abstractmethod

Return a human-readable description of this trigger for the CLI

Source code in prefect/events/schemas/automations.py
50
51
52
@abc.abstractmethod
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""

emit_event

Send an event to Prefect Cloud.

Parameters:

Name Type Description Default
event str

The name of the event that happened.

required
resource Dict[str, str]

The primary Resource this event concerns.

required
occurred Optional[DateTimeTZ]

When the event happened from the sender's perspective. Defaults to the current datetime.

None
related Optional[Union[List[Dict[str, str]], List[RelatedResource]]]

A list of additional Resources involved in this event.

None
payload Optional[Dict[str, Any]]

An open-ended set of data describing what happened.

None
id Optional[UUID]

The sender-provided identifier for this event. Defaults to a random UUID.

None
follows Optional[Event]

The event that preceded this one. If the preceding event happened more than 5 minutes prior to this event the follows relationship will not be set.

None

Returns:

Type Description
Optional[Event]

The event that was emitted if worker is using a client that emit

Optional[Event]

events, otherwise None

Source code in prefect/events/utilities.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def emit_event(
    event: str,
    resource: Dict[str, str],
    occurred: Optional[DateTimeTZ] = None,
    related: Optional[Union[List[Dict[str, str]], List[RelatedResource]]] = None,
    payload: Optional[Dict[str, Any]] = None,
    id: Optional[UUID] = None,
    follows: Optional[Event] = None,
) -> Optional[Event]:
    """
    Send an event to Prefect Cloud.

    Args:
        event: The name of the event that happened.
        resource: The primary Resource this event concerns.
        occurred: When the event happened from the sender's perspective.
                  Defaults to the current datetime.
        related: A list of additional Resources involved in this event.
        payload: An open-ended set of data describing what happened.
        id: The sender-provided identifier for this event. Defaults to a random
            UUID.
        follows: The event that preceded this one. If the preceding event
            happened more than 5 minutes prior to this event the follows
            relationship will not be set.

    Returns:
        The event that was emitted if worker is using a client that emit
        events, otherwise None
    """
    if not should_emit_events():
        return None

    operational_clients = [
        AssertingEventsClient,
        PrefectCloudEventsClient,
        PrefectEventsClient,
        PrefectEphemeralEventsClient,
    ]
    worker_instance = EventsWorker.instance()

    if worker_instance.client_type not in operational_clients:
        return None

    event_kwargs: Dict[str, Any] = {
        "event": event,
        "resource": resource,
    }

    if occurred is None:
        occurred = pendulum.now("UTC")
    event_kwargs["occurred"] = occurred

    if related is not None:
        event_kwargs["related"] = related

    if payload is not None:
        event_kwargs["payload"] = payload

    if id is not None:
        event_kwargs["id"] = id

    if follows is not None:
        if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING:
            event_kwargs["follows"] = follows.id

    event_obj = Event(**event_kwargs)
    worker_instance.send(event_obj)

    return event_obj