Skip to content

prefect.events

Event

Bases: PrefectBaseModel

The client-side view of an event that has happened to a Resource

Source code in prefect/events/schemas.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class Event(PrefectBaseModel):
    """The client-side view of an event that has happened to a Resource"""

    occurred: DateTimeTZ = Field(
        default_factory=pendulum.now,
        description="When the event happened from the sender's perspective",
    )
    event: str = Field(
        description="The name of the event that happened",
    )
    resource: Resource = Field(
        description="The primary Resource this event concerns",
    )
    related: List[RelatedResource] = Field(
        default_factory=list,
        description="A list of additional Resources involved in this event",
    )
    payload: Dict[str, Any] = Field(
        default_factory=dict,
        description="An open-ended set of data describing what happened",
    )
    id: UUID = Field(
        default_factory=uuid4,
        description="The client-provided identifier of this event",
    )
    follows: Optional[UUID] = Field(
        None,
        description=(
            "The ID of an event that is known to have occurred prior to this "
            "one. If set, this may be used to establish a more precise "
            "ordering of causally-related events when they occur close enough "
            "together in time that the system may receive them out-of-order."
        ),
    )

    @property
    def involved_resources(self) -> Iterable[Resource]:
        return [self.resource] + list(self.related)

    @validator("related")
    def enforce_maximum_related_resources(cls, value: List[RelatedResource]):
        if len(value) > MAXIMUM_RELATED_RESOURCES:
            raise ValueError(
                "The maximum number of related resources "
                f"is {MAXIMUM_RELATED_RESOURCES}"
            )

        return value

RelatedResource

Bases: Resource

A Resource with a specific role in an Event

Source code in prefect/events/schemas.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class RelatedResource(Resource):
    """A Resource with a specific role in an Event"""

    @root_validator(pre=True)
    def requires_resource_role(cls, values: Dict[str, Any]):
        labels = values.get("__root__")
        if not isinstance(labels, dict):
            return values

        labels = cast(Dict[str, str], labels)

        if "prefect.resource.role" not in labels:
            raise ValueError(
                "Related Resources must include the prefect.resource.role label"
            )
        if not labels["prefect.resource.role"]:
            raise ValueError("The prefect.resource.role label must be non-empty")

        return values

    @property
    def role(self) -> str:
        return self["prefect.resource.role"]

Resource

Bases: Labelled

An observable business object of interest to the user

Source code in prefect/events/schemas.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Resource(Labelled):
    """An observable business object of interest to the user"""

    @root_validator(pre=True)
    def enforce_maximum_labels(cls, values: Dict[str, Any]):
        labels = values.get("__root__")
        if not isinstance(labels, dict):
            return values

        if len(labels) > MAXIMUM_LABELS_PER_RESOURCE:
            raise ValueError(
                "The maximum number of labels per resource "
                f"is {MAXIMUM_LABELS_PER_RESOURCE}"
            )

        return values

    @root_validator(pre=True)
    def requires_resource_id(cls, values: Dict[str, Any]):
        labels = values.get("__root__")
        if not isinstance(labels, dict):
            return values

        labels = cast(Dict[str, str], labels)

        if "prefect.resource.id" not in labels:
            raise ValueError("Resources must include the prefect.resource.id label")
        if not labels["prefect.resource.id"]:
            raise ValueError("The prefect.resource.id label must be non-empty")

        return values

    @property
    def id(self) -> str:
        return self["prefect.resource.id"]

emit_event

Send an event to Prefect Cloud.

Parameters:

Name Type Description Default
event str

The name of the event that happened.

required
resource Dict[str, str]

The primary Resource this event concerns.

required
occurred Optional[DateTimeTZ]

When the event happened from the sender's perspective. Defaults to the current datetime.

None
related Optional[Union[List[Dict[str, str]], List[RelatedResource]]]

A list of additional Resources involved in this event.

None
payload Optional[Dict[str, Any]]

An open-ended set of data describing what happened.

None
id Optional[UUID]

The sender-provided identifier for this event. Defaults to a random UUID.

None
follows Optional[Event]

The event that preceded this one. If the preceding event happened more than 5 minutes prior to this event the follows relationship will not be set.

None

Returns:

Type Description
Optional[Event]

The event that was emitted if worker is using a client that emit

Optional[Event]

events, otherwise None.

Source code in prefect/events/utilities.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def emit_event(
    event: str,
    resource: Dict[str, str],
    occurred: Optional[DateTimeTZ] = None,
    related: Optional[Union[List[Dict[str, str]], List[RelatedResource]]] = None,
    payload: Optional[Dict[str, Any]] = None,
    id: Optional[UUID] = None,
    follows: Optional[Event] = None,
) -> Optional[Event]:
    """
    Send an event to Prefect Cloud.

    Args:
        event: The name of the event that happened.
        resource: The primary Resource this event concerns.
        occurred: When the event happened from the sender's perspective.
                  Defaults to the current datetime.
        related: A list of additional Resources involved in this event.
        payload: An open-ended set of data describing what happened.
        id: The sender-provided identifier for this event. Defaults to a random
            UUID.
        follows: The event that preceded this one. If the preceding event
            happened more than 5 minutes prior to this event the follows
            relationship will not be set.

    Returns:
        The event that was emitted if worker is using a client that emit
        events, otherwise None.
    """
    if not emit_events_to_cloud():
        return None

    operational_clients = [AssertingEventsClient, PrefectCloudEventsClient]
    worker_instance = EventsWorker.instance()

    if worker_instance.client_type not in operational_clients:
        return None

    event_kwargs = {
        "event": event,
        "resource": resource,
    }

    if occurred is None:
        occurred = pendulum.now("UTC")
    event_kwargs["occurred"] = occurred

    if related is not None:
        event_kwargs["related"] = related

    if payload is not None:
        event_kwargs["payload"] = payload

    if id is not None:
        event_kwargs["id"] = id

    if follows is not None:
        if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING:
            event_kwargs["follows"] = follows.id

    event_obj = Event(**event_kwargs)
    worker_instance.send(event_obj)

    return event_obj