1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
"""Event-loop debugging tools."""
from kombu.utils.eventio import READ, WRITE, ERR
from kombu.utils.functional import reprcall
def repr_flag(flag):
"""Return description of event loop flag."""
return '{}{}{}'.format('R' if flag & READ else '',
'W' if flag & WRITE else '',
'!' if flag & ERR else '')
def _rcb(obj):
if obj is None:
return '<missing>'
if isinstance(obj, str):
return obj
if isinstance(obj, tuple):
cb, args = obj
return reprcall(cb.__name__, args=args)
return obj.__name__
def repr_active(h):
"""Return description of active readers and writers."""
return ', '.join(repr_readers(h) + repr_writers(h))
def repr_events(h, events):
"""Return description of events returned by poll."""
return ', '.join(
'{}({})->{}'.format(
_rcb(callback_for(h, fd, fl, '(GONE)')), fd,
repr_flag(fl),
)
for fd, fl in events
)
def repr_readers(h):
"""Return description of pending readers."""
return ['({}){}->{}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
for fd, cb in h.readers.items()]
def repr_writers(h):
"""Return description of pending writers."""
return ['({}){}->{}'.format(fd, _rcb(cb), repr_flag(WRITE))
for fd, cb in h.writers.items()]
def callback_for(h, fd, flag, *default):
"""Return the callback used for hub+fd+flag."""
try:
if flag & READ:
return h.readers[fd]
if flag & WRITE:
if fd in h.consolidate:
return h.consolidate_callback
return h.writers[fd]
except KeyError:
if default:
return default[0]
raise
|