diff options
Diffstat (limited to 'kombu/clocks.py')
| -rw-r--r-- | kombu/clocks.py | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/kombu/clocks.py b/kombu/clocks.py index 3c152720..d02e8b32 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -1,8 +1,11 @@ """Logical Clocks and Synchronization.""" +from __future__ import annotations + from itertools import islice from operator import itemgetter from threading import Lock +from typing import Any __all__ = ('LamportClock', 'timetuple') @@ -15,7 +18,7 @@ class timetuple(tuple): Can be used as part of a heap to keep events ordered. Arguments: - clock (int): Event clock value. + clock (Optional[int]): Event clock value. timestamp (float): Event UNIX timestamp value. id (str): Event host id (e.g. ``hostname:pid``). obj (Any): Optional obj to associate with this event. @@ -23,16 +26,18 @@ class timetuple(tuple): __slots__ = () - def __new__(cls, clock, timestamp, id, obj=None): + def __new__( + cls, clock: int | None, timestamp: float, id: str, obj: Any = None + ) -> timetuple: return tuple.__new__(cls, (clock, timestamp, id, obj)) - def __repr__(self): + def __repr__(self) -> str: return R_CLOCK.format(*self) - def __getnewargs__(self): + def __getnewargs__(self) -> tuple: return tuple(self) - def __lt__(self, other): + def __lt__(self, other: tuple) -> bool: # 0: clock 1: timestamp 3: process id try: A, B = self[0], other[0] @@ -45,13 +50,13 @@ class timetuple(tuple): except IndexError: return NotImplemented - def __gt__(self, other): + def __gt__(self, other: tuple) -> bool: return other < self - def __le__(self, other): + def __le__(self, other: tuple) -> bool: return not other < self - def __ge__(self, other): + def __ge__(self, other: tuple) -> bool: return not self < other clock = property(itemgetter(0)) @@ -99,21 +104,23 @@ class LamportClock: #: The clocks current value. value = 0 - def __init__(self, initial_value=0, Lock=Lock): + def __init__( + self, initial_value: int = 0, Lock: type[Lock] = Lock + ) -> None: self.value = initial_value self.mutex = Lock() - def adjust(self, other): + def adjust(self, other: int) -> int: with self.mutex: value = self.value = max(self.value, other) + 1 return value - def forward(self): + def forward(self) -> int: with self.mutex: self.value += 1 return self.value - def sort_heap(self, h): + def sort_heap(self, h: list[tuple[int, str]]) -> tuple[int, str]: """Sort heap of events. List of tuples containing at least two elements, representing @@ -140,8 +147,8 @@ class LamportClock: # clock values unique, return first item return h[0] - def __str__(self): + def __str__(self) -> str: return str(self.value) - def __repr__(self): + def __repr__(self) -> str: return f'<LamportClock: {self.value}>' |
