diff options
Diffstat (limited to 'src/pip/_vendor/tenacity/wait.py')
-rw-r--r-- | src/pip/_vendor/tenacity/wait.py | 44 |
1 files changed, 20 insertions, 24 deletions
diff --git a/src/pip/_vendor/tenacity/wait.py b/src/pip/_vendor/tenacity/wait.py index 8fdfc8f9d..f9349c028 100644 --- a/src/pip/_vendor/tenacity/wait.py +++ b/src/pip/_vendor/tenacity/wait.py @@ -17,19 +17,12 @@ import abc import random import typing -from datetime import timedelta from pip._vendor.tenacity import _utils if typing.TYPE_CHECKING: from pip._vendor.tenacity import RetryCallState -wait_unit_type = typing.Union[int, float, timedelta] - - -def to_seconds(wait_unit: wait_unit_type) -> float: - return float(wait_unit.total_seconds() if isinstance(wait_unit, timedelta) else wait_unit) - class wait_base(abc.ABC): """Abstract base class for wait strategies.""" @@ -43,16 +36,19 @@ class wait_base(abc.ABC): def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]: # make it possible to use multiple waits with the built-in sum function - if other == 0: + if other == 0: # type: ignore[comparison-overlap] return self return self.__add__(other) +WaitBaseT = typing.Union[wait_base, typing.Callable[["RetryCallState"], typing.Union[float, int]]] + + class wait_fixed(wait_base): """Wait strategy that waits a fixed amount of time between each retry.""" - def __init__(self, wait: wait_unit_type) -> None: - self.wait_fixed = to_seconds(wait) + def __init__(self, wait: _utils.time_unit_type) -> None: + self.wait_fixed = _utils.to_seconds(wait) def __call__(self, retry_state: "RetryCallState") -> float: return self.wait_fixed @@ -68,9 +64,9 @@ class wait_none(wait_fixed): class wait_random(wait_base): """Wait strategy that waits a random amount of time between min/max.""" - def __init__(self, min: wait_unit_type = 0, max: wait_unit_type = 1) -> None: # noqa - self.wait_random_min = to_seconds(min) - self.wait_random_max = to_seconds(max) + def __init__(self, min: _utils.time_unit_type = 0, max: _utils.time_unit_type = 1) -> None: # noqa + self.wait_random_min = _utils.to_seconds(min) + self.wait_random_max = _utils.to_seconds(max) def __call__(self, retry_state: "RetryCallState") -> float: return self.wait_random_min + (random.random() * (self.wait_random_max - self.wait_random_min)) @@ -120,13 +116,13 @@ class wait_incrementing(wait_base): def __init__( self, - start: wait_unit_type = 0, - increment: wait_unit_type = 100, - max: wait_unit_type = _utils.MAX_WAIT, # noqa + start: _utils.time_unit_type = 0, + increment: _utils.time_unit_type = 100, + max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa ) -> None: - self.start = to_seconds(start) - self.increment = to_seconds(increment) - self.max = to_seconds(max) + self.start = _utils.to_seconds(start) + self.increment = _utils.to_seconds(increment) + self.max = _utils.to_seconds(max) def __call__(self, retry_state: "RetryCallState") -> float: result = self.start + (self.increment * (retry_state.attempt_number - 1)) @@ -149,13 +145,13 @@ class wait_exponential(wait_base): def __init__( self, multiplier: typing.Union[int, float] = 1, - max: wait_unit_type = _utils.MAX_WAIT, # noqa + max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa exp_base: typing.Union[int, float] = 2, - min: wait_unit_type = 0, # noqa + min: _utils.time_unit_type = 0, # noqa ) -> None: self.multiplier = multiplier - self.min = to_seconds(min) - self.max = to_seconds(max) + self.min = _utils.to_seconds(min) + self.max = _utils.to_seconds(max) self.exp_base = exp_base def __call__(self, retry_state: "RetryCallState") -> float: @@ -206,7 +202,7 @@ class wait_exponential_jitter(wait_base): This implements the strategy described here: https://cloud.google.com/storage/docs/retry-strategy - The wait time is min(initial * (2**n + random.uniform(0, jitter)), maximum) + The wait time is min(initial * 2**n + random.uniform(0, jitter), maximum) where n is the retry count. """ |