blob: bf92a3a80819a3a8fb75327064e234d42fc85734 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
"""
kombu.transport.virtual.scheduling
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Consumer utilities.
"""
from __future__ import absolute_import
from itertools import count
class FairCycle(object):
"""Consume from a set of resources, where each resource gets
an equal chance to be consumed from."""
def __init__(self, fun, resources, predicate=Exception):
self.fun = fun
self.resources = resources
self.predicate = predicate
self.pos = 0
def _next(self):
while 1:
try:
resource = self.resources[self.pos]
self.pos += 1
return resource
except IndexError:
self.pos = 0
if not self.resources:
raise self.predicate()
def get(self, **kwargs):
for tried in count(0): # for infinity
resource = self._next()
try:
return self.fun(resource, **kwargs), resource
except self.predicate:
if tried >= len(self.resources) - 1:
raise
def close(self):
pass
def __repr__(self):
return '<FairCycle: {self.pos}/{size} {self.resources}>'.format(
self=self, size=len(self.resources))
|