Skip to content

prefect.context

Async and thread safe models for passing runtime context data.

These contexts should never be directly mutated by the user.

For more user-accessible information about the current run, see prefect.runtime.

ContextModel

Bases: BaseModel

A base model for context data that forbids mutation and extra data while providing a context manager

Source code in prefect/context.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class ContextModel(BaseModel):
    """
    A base model for context data that forbids mutation and extra data while providing
    a context manager
    """

    # The context variable for storing data must be defined by the child class
    __var__: ContextVar
    _token: Token = PrivateAttr(None)

    class Config:
        # allow_mutation = False
        arbitrary_types_allowed = True
        extra = "forbid"

    def __enter__(self):
        if self._token is not None:
            raise RuntimeError(
                "Context already entered. Context enter calls cannot be nested."
            )
        self._token = self.__var__.set(self)
        return self

    def __exit__(self, *_):
        if not self._token:
            raise RuntimeError(
                "Asymmetric use of context. Context exit called without an enter."
            )
        self.__var__.reset(self._token)
        self._token = None

    @classmethod
    def get(cls: Type[T]) -> Optional[T]:
        return cls.__var__.get(None)

    def copy(self, **kwargs):
        """
        Duplicate the context model, optionally choosing which fields to include, exclude, or change.

        Attributes:
            include: Fields to include in new model.
            exclude: Fields to exclude from new model, as with values this takes precedence over include.
            update: Values to change/add in the new model. Note: the data is not validated before creating
                the new model - you should trust this data.
            deep: Set to `True` to make a deep copy of the model.

        Returns:
            A new model instance.
        """
        # Remove the token on copy to avoid re-entrance errors
        new = super().copy(**kwargs)
        new._token = None
        return new

copy

Duplicate the context model, optionally choosing which fields to include, exclude, or change.

Attributes:

Name Type Description
include

Fields to include in new model.

exclude

Fields to exclude from new model, as with values this takes precedence over include.

update

Values to change/add in the new model. Note: the data is not validated before creating the new model - you should trust this data.

deep

Set to True to make a deep copy of the model.

Returns:

Type Description

A new model instance.

Source code in prefect/context.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def copy(self, **kwargs):
    """
    Duplicate the context model, optionally choosing which fields to include, exclude, or change.

    Attributes:
        include: Fields to include in new model.
        exclude: Fields to exclude from new model, as with values this takes precedence over include.
        update: Values to change/add in the new model. Note: the data is not validated before creating
            the new model - you should trust this data.
        deep: Set to `True` to make a deep copy of the model.

    Returns:
        A new model instance.
    """
    # Remove the token on copy to avoid re-entrance errors
    new = super().copy(**kwargs)
    new._token = None
    return new

EngineContext

Bases: RunContext

The context for a flow run. Data in this context is only available from within a flow run function.

Attributes:

Name Type Description
flow Optional[Flow]

The flow instance associated with the run

flow_run Optional[FlowRun]

The API metadata for the flow run

task_runner BaseTaskRunner

The task runner instance being used for the flow run

task_run_futures List[PrefectFuture]

A list of futures for task runs submitted within this flow run

task_run_states List[State]

A list of states for task runs created within this flow run

task_run_results Dict[int, State]

A mapping of result ids to task run states for this flow run

flow_run_states List[State]

A list of states for flow runs created within this flow run

sync_portal Optional[BlockingPortal]

A blocking portal for sync task/flow runs in an async flow

timeout_scope Optional[CancelScope]

The cancellation scope for flow level timeouts

Source code in prefect/context.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class EngineContext(RunContext):
    """
    The context for a flow run. Data in this context is only available from within a
    flow run function.

    Attributes:
        flow: The flow instance associated with the run
        flow_run: The API metadata for the flow run
        task_runner: The task runner instance being used for the flow run
        task_run_futures: A list of futures for task runs submitted within this flow run
        task_run_states: A list of states for task runs created within this flow run
        task_run_results: A mapping of result ids to task run states for this flow run
        flow_run_states: A list of states for flow runs created within this flow run
        sync_portal: A blocking portal for sync task/flow runs in an async flow
        timeout_scope: The cancellation scope for flow level timeouts
    """

    flow: Optional["Flow"] = None
    flow_run: Optional[FlowRun] = None
    autonomous_task_run: Optional[TaskRun] = None
    task_runner: BaseTaskRunner
    log_prints: bool = False
    parameters: Optional[Dict[str, Any]] = None

    # Result handling
    result_factory: ResultFactory

    # Counter for task calls allowing unique
    task_run_dynamic_keys: Dict[str, int] = Field(default_factory=dict)

    # Counter for flow pauses
    observed_flow_pauses: Dict[str, int] = Field(default_factory=dict)

    # Tracking for objects created by this flow run
    task_run_futures: List[PrefectFuture] = Field(default_factory=list)
    task_run_states: List[State] = Field(default_factory=list)
    task_run_results: Dict[int, State] = Field(default_factory=dict)
    flow_run_states: List[State] = Field(default_factory=list)

    # The synchronous portal is only created for async flows for creating engine calls
    # from synchronous task and subflow calls
    sync_portal: Optional[anyio.abc.BlockingPortal] = None
    timeout_scope: Optional[anyio.abc.CancelScope] = None

    # Task group that can be used for background tasks during the flow run
    background_tasks: anyio.abc.TaskGroup

    # Events worker to emit events to Prefect Cloud
    events: Optional[EventsWorker] = None

    __var__ = ContextVar("flow_run")

PrefectObjectRegistry

Bases: ContextModel

A context that acts as a registry for all Prefect objects that are registered during load and execution.

Attributes:

Name Type Description
start_time DateTimeTZ

The time the object registry was created.

block_code_execution bool

If set, flow calls will be ignored.

capture_failures bool

If set, failures during init will be silenced and tracked.

Source code in prefect/context.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class PrefectObjectRegistry(ContextModel):
    """
    A context that acts as a registry for all Prefect objects that are
    registered during load and execution.

    Attributes:
        start_time: The time the object registry was created.
        block_code_execution: If set, flow calls will be ignored.
        capture_failures: If set, failures during __init__ will be silenced and tracked.
    """

    start_time: DateTimeTZ = Field(default_factory=lambda: pendulum.now("UTC"))

    _instance_registry: Dict[Type[T], List[T]] = PrivateAttr(
        default_factory=lambda: defaultdict(list)
    )

    # Failures will be a tuple of (exception, instance, args, kwargs)
    _instance_init_failures: Dict[
        Type[T], List[Tuple[Exception, T, Tuple, Dict]]
    ] = PrivateAttr(default_factory=lambda: defaultdict(list))

    block_code_execution: bool = False
    capture_failures: bool = False

    __var__ = ContextVar("object_registry")

    def get_instances(self, type_: Type[T]) -> List[T]:
        instances = []
        for registered_type, type_instances in self._instance_registry.items():
            if type_ in registered_type.mro():
                instances.extend(type_instances)
        return instances

    def get_instance_failures(
        self, type_: Type[T]
    ) -> List[Tuple[Exception, T, Tuple, Dict]]:
        failures = []
        for type__ in type_.mro():
            failures.extend(self._instance_init_failures[type__])
        return failures

    def register_instance(self, object):
        # TODO: Consider using a 'Set' to avoid duplicate entries
        self._instance_registry[type(object)].append(object)

    def register_init_failure(
        self, exc: Exception, object: Any, init_args: Tuple, init_kwargs: Dict
    ):
        self._instance_init_failures[type(object)].append(
            (exc, object, init_args, init_kwargs)
        )

    @classmethod
    def register_instances(cls, type_: Type[T]) -> Type[T]:
        """
        Decorator for a class that adds registration to the `PrefectObjectRegistry`
        on initialization of instances.
        """
        original_init = type_.__init__

        def __register_init__(__self__: T, *args: Any, **kwargs: Any) -> None:
            registry = cls.get()
            try:
                original_init(__self__, *args, **kwargs)
            except Exception as exc:
                if not registry or not registry.capture_failures:
                    raise
                else:
                    registry.register_init_failure(exc, __self__, args, kwargs)
            else:
                if registry:
                    registry.register_instance(__self__)

        update_wrapper(__register_init__, original_init)

        type_.__init__ = __register_init__
        return type_

register_instances classmethod

Decorator for a class that adds registration to the PrefectObjectRegistry on initialization of instances.

Source code in prefect/context.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@classmethod
def register_instances(cls, type_: Type[T]) -> Type[T]:
    """
    Decorator for a class that adds registration to the `PrefectObjectRegistry`
    on initialization of instances.
    """
    original_init = type_.__init__

    def __register_init__(__self__: T, *args: Any, **kwargs: Any) -> None:
        registry = cls.get()
        try:
            original_init(__self__, *args, **kwargs)
        except Exception as exc:
            if not registry or not registry.capture_failures:
                raise
            else:
                registry.register_init_failure(exc, __self__, args, kwargs)
        else:
            if registry:
                registry.register_instance(__self__)

    update_wrapper(__register_init__, original_init)

    type_.__init__ = __register_init__
    return type_

RunContext

Bases: ContextModel

The base context for a flow or task run. Data in this context will always be available when get_run_context is called.

Attributes:

Name Type Description
start_time DateTimeTZ

The time the run context was entered

client Union[PrefectClient, SyncPrefectClient]

The Prefect client instance being used for API communication

Source code in prefect/context.py
204
205
206
207
208
209
210
211
212
213
214
215
216
class RunContext(ContextModel):
    """
    The base context for a flow or task run. Data in this context will always be
    available when `get_run_context` is called.

    Attributes:
        start_time: The time the run context was entered
        client: The Prefect client instance being used for API communication
    """

    start_time: DateTimeTZ = Field(default_factory=lambda: pendulum.now("UTC"))
    input_keyset: Optional[Dict[str, Dict[str, str]]] = None
    client: Union[PrefectClient, SyncPrefectClient]

SettingsContext

Bases: ContextModel

The context for a Prefect settings.

This allows for safe concurrent access and modification of settings.

Attributes:

Name Type Description
profile Profile

The profile that is in use.

settings Settings

The complete settings model.

Source code in prefect/context.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
class SettingsContext(ContextModel):
    """
    The context for a Prefect settings.

    This allows for safe concurrent access and modification of settings.

    Attributes:
        profile: The profile that is in use.
        settings: The complete settings model.
    """

    profile: Profile
    settings: Settings

    __var__ = ContextVar("settings")

    def __hash__(self) -> int:
        return hash(self.settings)

    def __enter__(self):
        """
        Upon entrance, we ensure the home directory for the profile exists.
        """
        return_value = super().__enter__()

        try:
            prefect_home = Path(self.settings.value_of(PREFECT_HOME))
            prefect_home.mkdir(mode=0o0700, exist_ok=True)
        except OSError:
            warnings.warn(
                (
                    "Failed to create the Prefect home directory at "
                    f"{self.settings.value_of(PREFECT_HOME)}"
                ),
                stacklevel=2,
            )

        return return_value

    @classmethod
    def get(cls) -> "SettingsContext":
        # Return the global context instead of `None` if no context exists
        return super().get() or GLOBAL_SETTINGS_CONTEXT

TagsContext

Bases: ContextModel

The context for prefect.tags management.

Attributes:

Name Type Description
current_tags Set[str]

A set of current tags in the context

Source code in prefect/context.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class TagsContext(ContextModel):
    """
    The context for `prefect.tags` management.

    Attributes:
        current_tags: A set of current tags in the context
    """

    current_tags: Set[str] = Field(default_factory=set)

    @classmethod
    def get(cls) -> "TagsContext":
        # Return an empty `TagsContext` instead of `None` if no context exists
        return cls.__var__.get(TagsContext())

    __var__ = ContextVar("tags")

TaskRunContext

Bases: RunContext

The context for a task run. Data in this context is only available from within a task run function.

Attributes:

Name Type Description
task Task

The task instance associated with the task run

task_run TaskRun

The API metadata for this task run

Source code in prefect/context.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class TaskRunContext(RunContext):
    """
    The context for a task run. Data in this context is only available from within a
    task run function.

    Attributes:
        task: The task instance associated with the task run
        task_run: The API metadata for this task run
    """

    task: "Task"
    task_run: TaskRun
    log_prints: bool = False
    parameters: Dict[str, Any]

    # Result handling
    result_factory: ResultFactory

    __var__ = ContextVar("task_run")

get_run_context

Get the current run context from within a task or flow function.

Returns:

Type Description
Union[FlowRunContext, TaskRunContext]

A FlowRunContext or TaskRunContext depending on the function type.

Raises RuntimeError: If called outside of a flow or task run.

Source code in prefect/context.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def get_run_context() -> Union[FlowRunContext, TaskRunContext]:
    """
    Get the current run context from within a task or flow function.

    Returns:
        A `FlowRunContext` or `TaskRunContext` depending on the function type.

    Raises
        RuntimeError: If called outside of a flow or task run.
    """
    task_run_ctx = TaskRunContext.get()
    if task_run_ctx:
        return task_run_ctx

    flow_run_ctx = FlowRunContext.get()
    if flow_run_ctx:
        return flow_run_ctx

    raise MissingContextError(
        "No run context available. You are not in a flow or task run context."
    )

get_settings_context

Get the current settings context which contains profile information and the settings that are being used.

Generally, the settings that are being used are a combination of values from the profile and environment. See prefect.context.use_profile for more details.

Source code in prefect/context.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def get_settings_context() -> SettingsContext:
    """
    Get the current settings context which contains profile information and the
    settings that are being used.

    Generally, the settings that are being used are a combination of values from the
    profile and environment. See `prefect.context.use_profile` for more details.
    """
    settings_ctx = SettingsContext.get()

    if not settings_ctx:
        raise MissingContextError("No settings context found.")

    return settings_ctx

registry_from_script

Return a fresh registry with instances populated from execution of a script.

Source code in prefect/context.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def registry_from_script(
    path: str,
    block_code_execution: bool = True,
    capture_failures: bool = True,
) -> PrefectObjectRegistry:
    """
    Return a fresh registry with instances populated from execution of a script.
    """
    with PrefectObjectRegistry(
        block_code_execution=block_code_execution,
        capture_failures=capture_failures,
    ) as registry:
        load_script_as_module(path)

    return registry

root_settings_context

Return the settings context that will exist as the root context for the module.

The profile to use is determined with the following precedence - Command line via 'prefect --profile ' - Environment variable via 'PREFECT_PROFILE' - Profiles file via the 'active' key

Source code in prefect/context.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
def root_settings_context():
    """
    Return the settings context that will exist as the root context for the module.

    The profile to use is determined with the following precedence
    - Command line via 'prefect --profile <name>'
    - Environment variable via 'PREFECT_PROFILE'
    - Profiles file via the 'active' key
    """
    profiles = prefect.settings.load_profiles()
    active_name = profiles.active_name
    profile_source = "in the profiles file"

    if "PREFECT_PROFILE" in os.environ:
        active_name = os.environ["PREFECT_PROFILE"]
        profile_source = "by environment variable"

    if (
        sys.argv[0].endswith("/prefect")
        and len(sys.argv) >= 3
        and sys.argv[1] == "--profile"
    ):
        active_name = sys.argv[2]
        profile_source = "by command line argument"

    if active_name not in profiles.names:
        print(
            (
                f"WARNING: Active profile {active_name!r} set {profile_source} not "
                "found. The default profile will be used instead. "
            ),
            file=sys.stderr,
        )
        active_name = "default"

    with use_profile(
        profiles[active_name],
        # Override environment variables if the profile was set by the CLI
        override_environment_variables=profile_source == "by command line argument",
    ) as settings_context:
        return settings_context

tags

Context manager to add tags to flow and task run calls.

Tags are always combined with any existing tags.

Yields:

Type Description
Set[str]

The current set of tags

Examples:

>>> from prefect import tags, task, flow
>>> @task
>>> def my_task():
>>>     pass

Run a task with tags

>>> @flow
>>> def my_flow():
>>>     with tags("a", "b"):
>>>         my_task()  # has tags: a, b

Run a flow with tags

>>> @flow
>>> def my_flow():
>>>     pass
>>> with tags("a", "b"):
>>>     my_flow()  # has tags: a, b

Run a task with nested tag contexts

>>> @flow
>>> def my_flow():
>>>     with tags("a", "b"):
>>>         with tags("c", "d"):
>>>             my_task()  # has tags: a, b, c, d
>>>         my_task()  # has tags: a, b

Inspect the current tags

>>> @flow
>>> def my_flow():
>>>     with tags("c", "d"):
>>>         with tags("e", "f") as current_tags:
>>>              print(current_tags)
>>> with tags("a", "b"):
>>>     my_flow()
{"a", "b", "c", "d", "e", "f"}
Source code in prefect/context.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
@contextmanager
def tags(*new_tags: str) -> Generator[Set[str], None, None]:
    """
    Context manager to add tags to flow and task run calls.

    Tags are always combined with any existing tags.

    Yields:
        The current set of tags

    Examples:
        >>> from prefect import tags, task, flow
        >>> @task
        >>> def my_task():
        >>>     pass

        Run a task with tags

        >>> @flow
        >>> def my_flow():
        >>>     with tags("a", "b"):
        >>>         my_task()  # has tags: a, b

        Run a flow with tags

        >>> @flow
        >>> def my_flow():
        >>>     pass
        >>> with tags("a", "b"):
        >>>     my_flow()  # has tags: a, b

        Run a task with nested tag contexts

        >>> @flow
        >>> def my_flow():
        >>>     with tags("a", "b"):
        >>>         with tags("c", "d"):
        >>>             my_task()  # has tags: a, b, c, d
        >>>         my_task()  # has tags: a, b

        Inspect the current tags

        >>> @flow
        >>> def my_flow():
        >>>     with tags("c", "d"):
        >>>         with tags("e", "f") as current_tags:
        >>>              print(current_tags)
        >>> with tags("a", "b"):
        >>>     my_flow()
        {"a", "b", "c", "d", "e", "f"}
    """
    current_tags = TagsContext.get().current_tags
    new_tags = current_tags.union(new_tags)
    with TagsContext(current_tags=new_tags):
        yield new_tags

use_profile

Switch to a profile for the duration of this context.

Profile contexts are confined to an async context in a single thread.

Parameters:

Name Type Description Default
profile Union[Profile, str]

The name of the profile to load or an instance of a Profile.

required
override_environment_variable

If set, variables in the profile will take precedence over current environment variables. By default, environment variables will override profile settings.

required
include_current_context bool

If set, the new settings will be constructed with the current settings context as a base. If not set, the use_base settings will be loaded from the environment and defaults.

True

Yields:

Type Description

The created SettingsContext object

Source code in prefect/context.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
@contextmanager
def use_profile(
    profile: Union[Profile, str],
    override_environment_variables: bool = False,
    include_current_context: bool = True,
):
    """
    Switch to a profile for the duration of this context.

    Profile contexts are confined to an async context in a single thread.

    Args:
        profile: The name of the profile to load or an instance of a Profile.
        override_environment_variable: If set, variables in the profile will take
            precedence over current environment variables. By default, environment
            variables will override profile settings.
        include_current_context: If set, the new settings will be constructed
            with the current settings context as a base. If not set, the use_base settings
            will be loaded from the environment and defaults.

    Yields:
        The created `SettingsContext` object
    """
    if isinstance(profile, str):
        profiles = prefect.settings.load_profiles()
        profile = profiles[profile]

    if not isinstance(profile, Profile):
        raise TypeError(
            f"Unexpected type {type(profile).__name__!r} for `profile`. "
            "Expected 'str' or 'Profile'."
        )

    # Create a copy of the profiles settings as we will mutate it
    profile_settings = profile.settings.copy()

    existing_context = SettingsContext.get()
    if existing_context and include_current_context:
        settings = existing_context.settings
    else:
        settings = prefect.settings.get_settings_from_env()

    if not override_environment_variables:
        for key in os.environ:
            if key in prefect.settings.SETTING_VARIABLES:
                profile_settings.pop(prefect.settings.SETTING_VARIABLES[key], None)

    new_settings = settings.copy_with_update(updates=profile_settings)

    with SettingsContext(profile=profile, settings=new_settings) as ctx:
        yield ctx