1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
|
# -*- coding: utf-8 -*-
"""
kombu.async.hub
===============
Event loop implementation.
"""
from __future__ import absolute_import
import errno
from collections import deque
from contextlib import contextmanager
from time import sleep
from types import GeneratorType as generator
from amqp import promise
from kombu.five import Empty, range
from kombu.log import get_logger
from kombu.utils import cached_property, fileno
from kombu.utils.compat import get_errno
from kombu.utils.eventio import READ, WRITE, ERR, poll
from .timer import Timer
__all__ = ['Hub', 'get_event_loop', 'set_event_loop']
logger = get_logger(__name__)
_current_loop = None
class Stop(BaseException):
"""Stops the event loop."""
def _raise_stop_error():
raise Stop()
@contextmanager
def _dummy_context(*args, **kwargs):
yield
def get_event_loop():
return _current_loop
def set_event_loop(loop):
global _current_loop
_current_loop = loop
return loop
class Hub(object):
"""Event loop object.
:keyword timer: Specify timer object.
"""
#: Flag set if reading from an fd will not block.
READ = READ
#: Flag set if writing to an fd will not block.
WRITE = WRITE
#: Flag set on error, and the fd should be read from asap.
ERR = ERR
#: List of callbacks to be called when the loop is exiting,
#: applied with the hub instance as sole argument.
on_close = None
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer()
self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = deque()
self._running = False
self._loop = None
# The eventloop (in celery.worker.loops)
# will merge fds in this set and then instead of calling
# the callback for each ready fd it will call the
# :attr:`consolidate_callback` with the list of ready_fds
# as an argument. This API is internal and is only
# used by the multiprocessing pool to find inqueues
# that are ready to write.
self.consolidate = set()
self.consolidate_callback = None
self.propagate_errors = ()
self._create_poller()
def reset(self):
self.close()
self._create_poller()
def _create_poller(self):
self.poller = poll()
self._register_fd = self.poller.register
self._unregister_fd = self.poller.unregister
def _close_poller(self):
if self.poller is not None:
self.poller.close()
self.poller = None
self._register_fd = None
self._unregister_fd = None
def stop(self):
self.call_soon(_raise_stop_error)
def __repr__(self):
return '<Hub@{0:#x}: R:{1} W:{2}>'.format(
id(self), len(self.readers), len(self.writers),
)
def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
propagate=()):
timer = self.timer
delay = None
if timer and timer._queue:
for i in range(max_timers):
delay, entry = next(self.scheduler)
if entry is None:
break
try:
entry()
except propagate:
raise
except (MemoryError, AssertionError):
raise
except OSError as exc:
if get_errno(exc) == errno.ENOMEM:
raise
logger.error('Error in timer: %r', exc, exc_info=1)
except Exception as exc:
logger.error('Error in timer: %r', exc, exc_info=1)
return min(max(delay or 0, min_delay), max_delay)
def add(self, fd, callback, flags, args=(), consolidate=False):
fd = fileno(fd)
try:
self.poller.register(fd, flags)
except ValueError:
self._discard(fd)
raise
else:
dest = self.readers if flags & READ else self.writers
if consolidate:
self.consolidate.add(fd)
dest[fd] = None
else:
dest[fd] = callback, args
def remove(self, fd):
fd = fileno(fd)
self._unregister(fd)
self._discard(fd)
def run_forever(self):
self._running = True
try:
while 1:
try:
self.run_once()
except Stop:
break
finally:
self._running = False
def run_once(self):
try:
next(self.loop)
except StopIteration:
self._loop = None
def call_soon(self, callback, *args):
handle = promise(callback, args)
self._ready.append(handle)
return handle
def call_later(self, delay, callback, *args):
return self.timer.call_after(delay, callback, args)
def call_at(self, when, callback, *args):
return self.timer.call_at(when, callback, args)
def call_repeatedly(self, delay, callback, *args):
return self.timer.call_repeatedly(delay, callback, args)
def add_reader(self, fds, callback, *args):
return self.add(fds, callback, READ | ERR, args)
def add_writer(self, fds, callback, *args):
return self.add(fds, callback, WRITE, args)
def remove_reader(self, fd):
writable = fd in self.writers
on_write = self.writers.get(fd)
try:
self._unregister(fd)
self._discard(fd)
finally:
if writable:
cb, args = on_write
self.add(fd, cb, WRITE, args)
def remove_writer(self, fd):
readable = fd in self.readers
on_read = self.readers.get(fd)
try:
self._unregister(fd)
self._discard(fd)
finally:
if readable:
cb, args = on_read
self.add(fd, cb, READ | ERR, args)
def _unregister(self, fd):
try:
self.poller.unregister(fd)
except (AttributeError, KeyError, OSError):
pass
def close(self, *args):
[self._unregister(fd) for fd in self.readers]
self.readers.clear()
[self._unregister(fd) for fd in self.writers]
self.writers.clear()
self.consolidate.clear()
self._close_poller()
for callback in self.on_close:
callback(self)
def _discard(self, fd):
fd = fileno(fd)
self.readers.pop(fd, None)
self.writers.pop(fd, None)
self.consolidate.discard(fd)
def create_loop(self,
generator=generator, sleep=sleep, min=min, next=next,
Empty=Empty, StopIteration=StopIteration,
KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
readers, writers = self.readers, self.writers
poll = self.poller.poll
fire_timers = self.fire_timers
hub_remove = self.remove
scheduled = self.timer._queue
consolidate = self.consolidate
consolidate_callback = self.consolidate_callback
on_tick = self.on_tick
todo = self._ready
propagate = self.propagate_errors
while 1:
for tick_callback in on_tick:
tick_callback()
while todo:
item = todo.popleft()
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
#print('[[[HUB]]]: %s' % (self.repr_active(), ))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
#print('[EVENTS]: %s' % (self.nepr_events(events or []), ))
except ValueError: # Issue 882
raise StopIteration()
for fileno, event in events or ():
if fileno in consolidate and \
writers.get(fileno) is None:
to_consolidate.append(fileno)
continue
cb = cbargs = None
try:
if event & READ:
cb, cbargs = readers[fileno]
elif event & WRITE:
cb, cbargs = writers[fileno]
elif event & ERR:
try:
cb, cbargs = (readers.get(fileno) or
writers.get(fileno))
except TypeError:
pass
except (KeyError, Empty):
hub_remove(fileno)
continue
if cb is None:
continue
if isinstance(cb, generator):
try:
next(cb)
except OSError as exc:
if get_errno(exc) != errno.EBADF:
raise
hub_remove(fileno)
except StopIteration:
pass
except Exception:
hub_remove(fileno)
raise
else:
try:
cb(*cbargs)
except Empty:
pass
if to_consolidate:
consolidate_callback(to_consolidate)
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
def repr_active(self):
from .debug import repr_active
return repr_active(self)
def repr_events(self, events):
from .debug import repr_events
return repr_events(self, events)
@cached_property
def scheduler(self):
return iter(self.timer)
@property
def loop(self):
if self._loop is None:
self._loop = self.create_loop()
return self._loop
|