summaryrefslogtreecommitdiff
path: root/kazoo/recipe/counter.py
blob: b728bc2a651920a815bd805b1076c81b2763fac2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""Zookeeper Counter

:Maintainer: None
:Status: Unknown

"""
from kazoo.exceptions import BadVersionError
from kazoo.retry import ForceRetryError


class Counter(object):
    """Kazoo Counter

    A shared counter of either int or float values. Changes to the
    counter are done atomically. The general retry policy is used to
    retry operations if concurrent changes are detected.

    The data is marshaled using `repr(value)` and converted back using
    `type(counter.default)(value)` both using an ascii encoding. As
    such other data types might be used for the counter value.

    Counter changes can raise
    :class:`~kazoo.exceptions.BadVersionError` if the retry policy
    wasn't able to apply a change.

    Example usage:

    .. code-block:: python

        zk = KazooClient()
        zk.start()
        counter = zk.Counter("/int")
        counter += 2
        counter -= 1
        counter.value == 1
        counter.pre_value == 2
        counter.post_value == 1

        counter = zk.Counter("/float", default=1.0)
        counter += 2.0
        counter.value == 3.0
        counter.pre_value == 1.0
        counter.post_value == 3.0

    """
    def __init__(self, client, path, default=0):
        """Create a Kazoo Counter

        :param client: A :class:`~kazoo.client.KazooClient` instance.
        :param path: The counter path to use.
        :param default: The default value.

        """
        self.client = client
        self.path = path
        self.default = default
        self.default_type = type(default)
        self._ensured_path = False
        self.pre_value = None
        self.post_value = None

    def _ensure_node(self):
        if not self._ensured_path:
            # make sure our node exists
            self.client.ensure_path(self.path)
            self._ensured_path = True

    def _value(self):
        self._ensure_node()
        old, stat = self.client.get(self.path)
        old = old.decode('ascii') if old != b'' else self.default
        version = stat.version
        data = self.default_type(old)
        return data, version

    @property
    def value(self):
        return self._value()[0]

    def _change(self, value):
        if not isinstance(value, self.default_type):
            raise TypeError('invalid type for value change')
        self.client.retry(self._inner_change, value)
        return self

    def _inner_change(self, value):
        self.pre_value, version = self._value()
        post_value = self.pre_value + value
        data = repr(post_value).encode('ascii')
        try:
            self.client.set(self.path, data, version=version)
        except BadVersionError:  # pragma: nocover
            self.post_value = None
            raise ForceRetryError()
        self.post_value = post_value

    def __add__(self, value):
        """Add value to counter."""
        return self._change(value)

    def __sub__(self, value):
        """Subtract value from counter."""
        return self._change(-value)