summaryrefslogtreecommitdiff
path: root/src/pip/_vendor/tenacity/wait.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pip/_vendor/tenacity/wait.py')
-rw-r--r--src/pip/_vendor/tenacity/wait.py44
1 files changed, 20 insertions, 24 deletions
diff --git a/src/pip/_vendor/tenacity/wait.py b/src/pip/_vendor/tenacity/wait.py
index 8fdfc8f9d..f9349c028 100644
--- a/src/pip/_vendor/tenacity/wait.py
+++ b/src/pip/_vendor/tenacity/wait.py
@@ -17,19 +17,12 @@
import abc
import random
import typing
-from datetime import timedelta
from pip._vendor.tenacity import _utils
if typing.TYPE_CHECKING:
from pip._vendor.tenacity import RetryCallState
-wait_unit_type = typing.Union[int, float, timedelta]
-
-
-def to_seconds(wait_unit: wait_unit_type) -> float:
- return float(wait_unit.total_seconds() if isinstance(wait_unit, timedelta) else wait_unit)
-
class wait_base(abc.ABC):
"""Abstract base class for wait strategies."""
@@ -43,16 +36,19 @@ class wait_base(abc.ABC):
def __radd__(self, other: "wait_base") -> typing.Union["wait_combine", "wait_base"]:
# make it possible to use multiple waits with the built-in sum function
- if other == 0:
+ if other == 0: # type: ignore[comparison-overlap]
return self
return self.__add__(other)
+WaitBaseT = typing.Union[wait_base, typing.Callable[["RetryCallState"], typing.Union[float, int]]]
+
+
class wait_fixed(wait_base):
"""Wait strategy that waits a fixed amount of time between each retry."""
- def __init__(self, wait: wait_unit_type) -> None:
- self.wait_fixed = to_seconds(wait)
+ def __init__(self, wait: _utils.time_unit_type) -> None:
+ self.wait_fixed = _utils.to_seconds(wait)
def __call__(self, retry_state: "RetryCallState") -> float:
return self.wait_fixed
@@ -68,9 +64,9 @@ class wait_none(wait_fixed):
class wait_random(wait_base):
"""Wait strategy that waits a random amount of time between min/max."""
- def __init__(self, min: wait_unit_type = 0, max: wait_unit_type = 1) -> None: # noqa
- self.wait_random_min = to_seconds(min)
- self.wait_random_max = to_seconds(max)
+ def __init__(self, min: _utils.time_unit_type = 0, max: _utils.time_unit_type = 1) -> None: # noqa
+ self.wait_random_min = _utils.to_seconds(min)
+ self.wait_random_max = _utils.to_seconds(max)
def __call__(self, retry_state: "RetryCallState") -> float:
return self.wait_random_min + (random.random() * (self.wait_random_max - self.wait_random_min))
@@ -120,13 +116,13 @@ class wait_incrementing(wait_base):
def __init__(
self,
- start: wait_unit_type = 0,
- increment: wait_unit_type = 100,
- max: wait_unit_type = _utils.MAX_WAIT, # noqa
+ start: _utils.time_unit_type = 0,
+ increment: _utils.time_unit_type = 100,
+ max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa
) -> None:
- self.start = to_seconds(start)
- self.increment = to_seconds(increment)
- self.max = to_seconds(max)
+ self.start = _utils.to_seconds(start)
+ self.increment = _utils.to_seconds(increment)
+ self.max = _utils.to_seconds(max)
def __call__(self, retry_state: "RetryCallState") -> float:
result = self.start + (self.increment * (retry_state.attempt_number - 1))
@@ -149,13 +145,13 @@ class wait_exponential(wait_base):
def __init__(
self,
multiplier: typing.Union[int, float] = 1,
- max: wait_unit_type = _utils.MAX_WAIT, # noqa
+ max: _utils.time_unit_type = _utils.MAX_WAIT, # noqa
exp_base: typing.Union[int, float] = 2,
- min: wait_unit_type = 0, # noqa
+ min: _utils.time_unit_type = 0, # noqa
) -> None:
self.multiplier = multiplier
- self.min = to_seconds(min)
- self.max = to_seconds(max)
+ self.min = _utils.to_seconds(min)
+ self.max = _utils.to_seconds(max)
self.exp_base = exp_base
def __call__(self, retry_state: "RetryCallState") -> float:
@@ -206,7 +202,7 @@ class wait_exponential_jitter(wait_base):
This implements the strategy described here:
https://cloud.google.com/storage/docs/retry-strategy
- The wait time is min(initial * (2**n + random.uniform(0, jitter)), maximum)
+ The wait time is min(initial * 2**n + random.uniform(0, jitter), maximum)
where n is the retry count.
"""