Skip to content

prefect.concurrency.sync

concurrency

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

A simple example of using the sync concurrency context manager:

from prefect.concurrency.sync import concurrency

def resource_heavy():
    with concurrency("test", occupy=1):
        print("Resource heavy task")

def main():
    resource_heavy()

Source code in prefect/concurrency/sync.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@contextmanager
def concurrency(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
):
    """A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.

    Example:
    A simple example of using the sync `concurrency` context manager:
    ```python
    from prefect.concurrency.sync import concurrency

    def resource_heavy():
        with concurrency("test", occupy=1):
            print("Resource heavy task")

    def main():
        resource_heavy()
    ```
    """
    names = names if isinstance(names, list) else [names]

    limits: List[MinimalConcurrencyLimitResponse] = _call_async_function_from_sync(
        _acquire_concurrency_slots, names, occupy, timeout_seconds=timeout_seconds
    )
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, occupy)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, pendulum.now("UTC") - acquisition_time)
        _call_async_function_from_sync(
            _release_concurrency_slots,
            names,
            occupy,
            occupancy_period.total_seconds(),
        )
        _emit_concurrency_release_events(limits, occupy, emitted_events)

rate_limit

Block execution until an occupy number of slots of the concurrency limits given in names are acquired. Requires that all given concurrency limits have a slot decay.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
Source code in prefect/concurrency/sync.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def rate_limit(names: Union[str, List[str]], occupy: int = 1):
    """Block execution until an `occupy` number of slots of the concurrency
    limits given in `names` are acquired. Requires that all given concurrency
    limits have a slot decay.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
    """
    names = names if isinstance(names, list) else [names]
    limits = _call_async_function_from_sync(
        _acquire_concurrency_slots, names, occupy, mode="rate_limit"
    )
    _emit_concurrency_acquisition_events(limits, occupy)