summaryrefslogtreecommitdiff
path: root/kombu/clocks.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/clocks.py')
-rw-r--r--kombu/clocks.py35
1 files changed, 21 insertions, 14 deletions
diff --git a/kombu/clocks.py b/kombu/clocks.py
index 3c152720..d02e8b32 100644
--- a/kombu/clocks.py
+++ b/kombu/clocks.py
@@ -1,8 +1,11 @@
"""Logical Clocks and Synchronization."""
+from __future__ import annotations
+
from itertools import islice
from operator import itemgetter
from threading import Lock
+from typing import Any
__all__ = ('LamportClock', 'timetuple')
@@ -15,7 +18,7 @@ class timetuple(tuple):
Can be used as part of a heap to keep events ordered.
Arguments:
- clock (int): Event clock value.
+ clock (Optional[int]): Event clock value.
timestamp (float): Event UNIX timestamp value.
id (str): Event host id (e.g. ``hostname:pid``).
obj (Any): Optional obj to associate with this event.
@@ -23,16 +26,18 @@ class timetuple(tuple):
__slots__ = ()
- def __new__(cls, clock, timestamp, id, obj=None):
+ def __new__(
+ cls, clock: int | None, timestamp: float, id: str, obj: Any = None
+ ) -> timetuple:
return tuple.__new__(cls, (clock, timestamp, id, obj))
- def __repr__(self):
+ def __repr__(self) -> str:
return R_CLOCK.format(*self)
- def __getnewargs__(self):
+ def __getnewargs__(self) -> tuple:
return tuple(self)
- def __lt__(self, other):
+ def __lt__(self, other: tuple) -> bool:
# 0: clock 1: timestamp 3: process id
try:
A, B = self[0], other[0]
@@ -45,13 +50,13 @@ class timetuple(tuple):
except IndexError:
return NotImplemented
- def __gt__(self, other):
+ def __gt__(self, other: tuple) -> bool:
return other < self
- def __le__(self, other):
+ def __le__(self, other: tuple) -> bool:
return not other < self
- def __ge__(self, other):
+ def __ge__(self, other: tuple) -> bool:
return not self < other
clock = property(itemgetter(0))
@@ -99,21 +104,23 @@ class LamportClock:
#: The clocks current value.
value = 0
- def __init__(self, initial_value=0, Lock=Lock):
+ def __init__(
+ self, initial_value: int = 0, Lock: type[Lock] = Lock
+ ) -> None:
self.value = initial_value
self.mutex = Lock()
- def adjust(self, other):
+ def adjust(self, other: int) -> int:
with self.mutex:
value = self.value = max(self.value, other) + 1
return value
- def forward(self):
+ def forward(self) -> int:
with self.mutex:
self.value += 1
return self.value
- def sort_heap(self, h):
+ def sort_heap(self, h: list[tuple[int, str]]) -> tuple[int, str]:
"""Sort heap of events.
List of tuples containing at least two elements, representing
@@ -140,8 +147,8 @@ class LamportClock:
# clock values unique, return first item
return h[0]
- def __str__(self):
+ def __str__(self) -> str:
return str(self.value)
- def __repr__(self):
+ def __repr__(self) -> str:
return f'<LamportClock: {self.value}>'