diff options
author | Ask Solem <ask@celeryproject.org> | 2015-11-23 17:56:43 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-11-23 17:56:43 -0800 |
commit | 5cc20d8c04e3d5718024a7cefc75d2242a512f48 (patch) | |
tree | 07875fcb13d1be2e31bcfef6a9664c91f776818b | |
parent | 98d071ac6637a19ceea51fb20fb94621f127442b (diff) | |
download | kombu-nsq.tar.gz |
NSQ work in progressnsq
-rw-r--r-- | consumer.py | 26 | ||||
-rw-r--r-- | kombu/async/hub.py | 12 | ||||
-rw-r--r-- | kombu/async/tornado.py | 62 | ||||
-rw-r--r-- | kombu/connection.py | 5 | ||||
-rw-r--r-- | kombu/transport/SQS.py | 3 | ||||
-rw-r--r-- | kombu/transport/__init__.py | 2 | ||||
-rw-r--r-- | kombu/transport/base.py | 3 | ||||
-rw-r--r-- | kombu/transport/nsq.py | 173 | ||||
-rw-r--r-- | kombu/transport/virtual/__init__.py | 1 | ||||
-rw-r--r-- | requirements/extras/nsq.txt | 1 | ||||
-rw-r--r-- | setup.py | 1 | ||||
-rw-r--r-- | t.py | 43 |
12 files changed, 329 insertions, 3 deletions
diff --git a/consumer.py b/consumer.py new file mode 100644 index 00000000..6c8dd7e1 --- /dev/null +++ b/consumer.py @@ -0,0 +1,26 @@ +import nsq +import time +import logging +from nsq.async import AsyncConn +from kombu.async.tornado import TornadoHub +from kombu.log import setup_logging + +setup_logging(loglevel=logging.DEBUG) + +hub = TornadoHub() + +def handler(message): + print(message.body) + return True + +reader = nsq.Reader( + message_handler=handler, + nsqd_tcp_addresses=['localhost:4150'], + #lookupd_http_addresses=['http://localhost:4161'], + io_loop=hub, + topic='c.stress', + channel='c.stress', + max_in_flight=9, + lookupd_poll_interval=1) +hub.run_forever() + diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 5785603b..c8c5686a 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -11,7 +11,7 @@ from __future__ import absolute_import import errno from contextlib import contextmanager -from time import sleep +from time import sleep, time from types import GeneratorType as generator from amqp.promise import Thenable, promise @@ -21,6 +21,7 @@ from kombu.log import get_logger from kombu.utils import cached_property, fileno from kombu.utils.eventio import READ, WRITE, ERR, poll +from .debug import repr_flag from .timer import Timer __all__ = ['Hub', 'get_event_loop', 'set_event_loop'] @@ -177,10 +178,15 @@ class Hub(object): def run_once(self): try: - next(self.loop) + return next(self.loop) except StopIteration: self._loop = None + def run_until_ready(self, p): + while not p.ready: + self.run_once() + return p + def call_soon(self, callback, *args): if not isinstance(callback, Thenable): callback = promise(callback, args) @@ -335,7 +341,7 @@ class Hub(object): else: # no sockets yet, startup is probably not done. sleep(min(poll_timeout, 0.1)) - yield + yield poll_timeout def repr_active(self): from .debug import repr_active diff --git a/kombu/async/tornado.py b/kombu/async/tornado.py new file mode 100644 index 00000000..ae07d659 --- /dev/null +++ b/kombu/async/tornado.py @@ -0,0 +1,62 @@ +from __future__ import absolute_import, unicode_literals + +from time import time + +from kombu.utils import maybe_fileno + +from . import Hub, get_event_loop + + +class TornadoHub(Hub): + READ = Hub.READ + WRITE = Hub.WRITE + ERROR = Hub.ERR + + def __init__(self, *args, **kwargs): + super(TornadoHub, self).__init__(*args, **kwargs) + self._callbacks = {} + + def add_callback(self, wrapper): + #print('add callback') + self.call_soon(wrapper) + + def add_handler(self, fd, handler, state): + #print('add handler: %r' % (handler,)) + fd = maybe_fileno(fd) + self._callbacks[fd] = handler + if state & self.READ or state & self.ERROR: + self.add_reader(fd, handler, fd, self.READ) + elif state & self.WRITE: + self.add_writer(fd, handler, fd, self.WRITE) + + def split_fd(self, fd): + return maybe_fileno(fd) or fd, fd + + def update_handler(self, fd, state): + fd, _ = self.split_fd(fd) + #print('update handler: %r %r' % (fd, repr_flag(state))) + if state & self.WRITE and fd not in self.writers: + cb = self._callbacks[fd] + self.add_writer(fd, cb, fd, self.WRITE) + elif ((state & self.READ or state & self.ERROR) and + fd not in self.readers): + cb = self._callbacks[fd] + self.add_reader(fd, cb, fd, self.READ) + + if not state & self.WRITE and fd in self.writers: + self.remove_writer(fd) + if not state & self.READ and fd in self.readers: + self.remove_reader(fd) + + def remove_handler(self, fd): + fd = maybe_fileno(fd) + #print('remove handler: %r' %(fd,)) + self._callbacks.pop(fd, None) + self.remove_reader(fd) + self.remove_writer(fd) + + def time(self): + return time() + + def add_timeout(self, next_timeout, handler): + self.call_later(next_timeout, handler) diff --git a/kombu/connection.py b/kombu/connection.py index f051dc8c..5c854e4b 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -823,6 +823,11 @@ class Connection(object): @property def is_evented(self): return self.transport.implements.async + + @property + def requires_hub(self): + return self.transport.requires_hub + BrokerConnection = Connection diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index d148f2e4..df1fd1b9 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -472,3 +472,6 @@ class Transport(virtual.Transport): async=True, exchange_type=frozenset(['direct']), ) + + def driver_version(self): + return gettatr(boto, '__version__', None) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index c1d68681..0e5c1f66 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -50,6 +50,8 @@ TRANSPORT_ALIASES = { 'redis': 'kombu.transport.redis:Transport', 'SQS': 'kombu.transport.SQS:Transport', 'sqs': 'kombu.transport.SQS:Transport', + 'nsq': 'kombu.transport.nsq.Transport', + 'nsqtcp': 'kombu.transport.nsq.TCPTransport', 'beanstalk': 'kombu.transport.beanstalk:Transport', 'mongodb': 'kombu.transport.mongodb:Transport', 'couchdb': 'kombu.transport.couchdb:Transport', diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 9b04dfb9..e724fbbe 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -112,6 +112,9 @@ class Transport(object): __reader = None + #: Override this if the transport requires a specific Hub type. + requires_hub = None + implements = default_transport_capabilities.extend() def __init__(self, client, **kwargs): diff --git a/kombu/transport/nsq.py b/kombu/transport/nsq.py new file mode 100644 index 00000000..7f3b9870 --- /dev/null +++ b/kombu/transport/nsq.py @@ -0,0 +1,173 @@ +from __future__ import absolute_import, unicode_literals + +from amqp import ensure_promise, promise + +from kombu.async import get_event_loop +from kombu.async.tornado import TornadoHub +from kombu.utils import cached_property +from kombu.utils.json import dumps, loads + +from . import virtual + +try: + import nsq +except ImportError: # pragma: no cover + nsq = None # noqa + + +class Writer(nsq.Writer): + + def __init__(self, hosts, on_connected=None, **kwargs): + self.on_connected = ensure_promise(on_connected) + super(Writer, self).__init__(hosts, **kwargs) + + def _on_connection_ready(self, conn, **kwargs): + if conn.id not in self.conns: + print('ON CONNECTED!') + self.on_connected(conn, **kwargs) + super(Writer, self)._on_connection_ready(conn, **kwargs) + + +class Channel(virtual.Channel): + + _noack_queues = set() + + def __init__(self, *args, **kwargs): + if nsq is None: + raise ImportError('pynsq is not installed') + super(Channel, self).__init__(*args, **kwargs) + self.hub = get_event_loop() + if self.hub is None: + self.hub = TornadoHub() + self._max_in_flight = 1000 + self._readers = {} + self._writer = None + self._on_writer_connected = promise() + self._no_ack_queues = set() + + def _put(self, queue, message, **kwargs): + p = promise(args=(queue, message), callback=self._message_sent) + self._on_writer_connected.then( + promise(self._send_message, (p, self.writer, queue, message)), + ) + return self.hub.run_until_ready(p) + + def _message_sent(self, *args, **kwargs): + print('message sent: %r %r' % (args, kwargs)) + + def create_writer(self): + conninfo = self.conninfo + return Writer( + ['localhost:4150'], + on_connected=self._on_writer_connected, + io_loop=self.hub, + ) + + def _send_message(self, p, writer, queue, message, conn): + writer.pub(queue, dumps(message), callback=p) + return p + + + def create_reader(self, queue): + return nsq.Reader( + topic=queue, channel=queue, + max_in_flight=self._max_in_flight, + nsqd_tcp_addresses=['localhost:4150'], + message_handler=promise(self.on_message, (queue,)), + #message_handler=self.on_message, + io_loop=self.hub, + lookupd_poll_interval=1, + ) + + def basic_qos(self, prefetch_size=0, prefetch_count=0, + apply_global=False): + self._max_in_flight = prefetch_count + super(Channel, self).basic_qos( + prefetch_size, prefetch_count, apply_global) + for reader in self._readers.values(): + reader.set_max_in_flight(self._max_in_flight) + + def basic_consume(self, queue, no_ack, *args, **kwargs): + if no_ack: + self._no_ack_queues.add(queue) + consumer_tag = super(Channel, self).basic_consume( + queue, no_ack, *args, **kwargs) + reader = self._readers[queue] = self.create_reader(queue) + return consumer_tag + + def on_message(self, queue, message): + if message: + callbacks = self.connection._callbacks + payload = loads(message.body) + if queue in self._noack_queues: + message.finish() + else: + payload['properties']['delivery_info'].update({ + 'nsq_message': message, + }) + message.enable_async() + payload['properties']['delivery_tag'] = message.id + callbacks[queue](payload) + return True + + def _get_message_by_tag(self, delivery_tag): + delivery_info = self.qos.get(delivery_tag).delivery_info + try: + return delivery_info['nsq_message'] + except KeyError: + pass + + def basic_ack(self, delivery_tag): + message = self._get_message_by_tag(delivery_tag) + if message: + message.finish() + super(Channel, self).basic_ack(delivery_tag) + + def basic_reject(self, delivery_tag, requeue=False): + message = self._get_message_by_tag(delivery_tag) + message.requeue() if requeue else message.finish() + super(Channel, self).basic_reject(delivery_tag, requeue) + + @property + def writer(self): + if self._writer is None: + self._writer = self.create_writer() + return self._writer + + @property + def conninfo(self): + return self.connection.client + +class Transport(virtual.Transport): + Channel = Channel + + using_lookupd = True + default_port = None + connection_errors = ( + virtual.Transport.connection_errors + + () + ) + channel_errors = ( + virtual.Transport.channel_errors, + () + ) + driver_type = 'nsq' + driver_name = 'nsq' + + implements = virtual.Transport.implements.extend( + async=True, + exchange_type={'direct'}, + ) + + requires_hub = TornadoHub + + @cached_property + def hub(self): + return TornadoHub() + + def driver_version(self): + return nsq.__version__ + + +class TCPTransport(virtual.Transport): + using_lookupd = False diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 0601ebe0..c991dd69 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -547,6 +547,7 @@ class Channel(AbstractChannel, base.StdChannel): self._consumers.add(consumer_tag) self._reset_cycle() + return consumer_tag def basic_cancel(self, consumer_tag): """Cancel consumer by consumer tag.""" diff --git a/requirements/extras/nsq.txt b/requirements/extras/nsq.txt new file mode 100644 index 00000000..b016f52b --- /dev/null +++ b/requirements/extras/nsq.txt @@ -0,0 +1 @@ +pynsq @@ -127,6 +127,7 @@ tests_require = reqs('test3.txt' if PY3 else 'test.txt') extras_require = extra['extras_require'] = { 'msgpack': extras('msgpack.txt'), + 'nsq': extras('nsq.txt'), 'yaml': extras('yaml.txt'), 'redis': extras('redis.txt'), 'mongodb': extras('mongodb.txt'), @@ -0,0 +1,43 @@ +import nsq +import time +import logging +from nsq.async import AsyncConn +from kombu.async.tornado import TornadoHub +from kombu.five import monotonic +from kombu.log import setup_logging + +setup_logging(loglevel=logging.INFO) + +hub = TornadoHub() + +count = 0 +finished = 0 +N = 10000 + +writer = nsq.Writer(['localhost:4150'], io_loop=hub) + +def pub_message(i): + writer.pub('cstress', str(i) , finish_pub) + +def finish_pub(conn, data): + global finished + finished += 1 + if finished >= N: + print('TIME: %r' % (monotonic() - time_start,)) + + +def on_it(): + global count + global time_start + if count == 0: + time_start = monotonic() + if count > N + 1: + return + count += 1 + pub_message(count) + hub.call_soon(on_it) + +hub.call_later(1, on_it) +#hub.call_repeatedly(1, pub_message) +hub.run_forever() + |