summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-11-23 17:56:43 -0800
committerAsk Solem <ask@celeryproject.org>2015-11-23 17:56:43 -0800
commit5cc20d8c04e3d5718024a7cefc75d2242a512f48 (patch)
tree07875fcb13d1be2e31bcfef6a9664c91f776818b
parent98d071ac6637a19ceea51fb20fb94621f127442b (diff)
downloadkombu-nsq.tar.gz
NSQ work in progressnsq
-rw-r--r--consumer.py26
-rw-r--r--kombu/async/hub.py12
-rw-r--r--kombu/async/tornado.py62
-rw-r--r--kombu/connection.py5
-rw-r--r--kombu/transport/SQS.py3
-rw-r--r--kombu/transport/__init__.py2
-rw-r--r--kombu/transport/base.py3
-rw-r--r--kombu/transport/nsq.py173
-rw-r--r--kombu/transport/virtual/__init__.py1
-rw-r--r--requirements/extras/nsq.txt1
-rw-r--r--setup.py1
-rw-r--r--t.py43
12 files changed, 329 insertions, 3 deletions
diff --git a/consumer.py b/consumer.py
new file mode 100644
index 00000000..6c8dd7e1
--- /dev/null
+++ b/consumer.py
@@ -0,0 +1,26 @@
+import nsq
+import time
+import logging
+from nsq.async import AsyncConn
+from kombu.async.tornado import TornadoHub
+from kombu.log import setup_logging
+
+setup_logging(loglevel=logging.DEBUG)
+
+hub = TornadoHub()
+
+def handler(message):
+ print(message.body)
+ return True
+
+reader = nsq.Reader(
+ message_handler=handler,
+ nsqd_tcp_addresses=['localhost:4150'],
+ #lookupd_http_addresses=['http://localhost:4161'],
+ io_loop=hub,
+ topic='c.stress',
+ channel='c.stress',
+ max_in_flight=9,
+ lookupd_poll_interval=1)
+hub.run_forever()
+
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 5785603b..c8c5686a 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -11,7 +11,7 @@ from __future__ import absolute_import
import errno
from contextlib import contextmanager
-from time import sleep
+from time import sleep, time
from types import GeneratorType as generator
from amqp.promise import Thenable, promise
@@ -21,6 +21,7 @@ from kombu.log import get_logger
from kombu.utils import cached_property, fileno
from kombu.utils.eventio import READ, WRITE, ERR, poll
+from .debug import repr_flag
from .timer import Timer
__all__ = ['Hub', 'get_event_loop', 'set_event_loop']
@@ -177,10 +178,15 @@ class Hub(object):
def run_once(self):
try:
- next(self.loop)
+ return next(self.loop)
except StopIteration:
self._loop = None
+ def run_until_ready(self, p):
+ while not p.ready:
+ self.run_once()
+ return p
+
def call_soon(self, callback, *args):
if not isinstance(callback, Thenable):
callback = promise(callback, args)
@@ -335,7 +341,7 @@ class Hub(object):
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
- yield
+ yield poll_timeout
def repr_active(self):
from .debug import repr_active
diff --git a/kombu/async/tornado.py b/kombu/async/tornado.py
new file mode 100644
index 00000000..ae07d659
--- /dev/null
+++ b/kombu/async/tornado.py
@@ -0,0 +1,62 @@
+from __future__ import absolute_import, unicode_literals
+
+from time import time
+
+from kombu.utils import maybe_fileno
+
+from . import Hub, get_event_loop
+
+
+class TornadoHub(Hub):
+ READ = Hub.READ
+ WRITE = Hub.WRITE
+ ERROR = Hub.ERR
+
+ def __init__(self, *args, **kwargs):
+ super(TornadoHub, self).__init__(*args, **kwargs)
+ self._callbacks = {}
+
+ def add_callback(self, wrapper):
+ #print('add callback')
+ self.call_soon(wrapper)
+
+ def add_handler(self, fd, handler, state):
+ #print('add handler: %r' % (handler,))
+ fd = maybe_fileno(fd)
+ self._callbacks[fd] = handler
+ if state & self.READ or state & self.ERROR:
+ self.add_reader(fd, handler, fd, self.READ)
+ elif state & self.WRITE:
+ self.add_writer(fd, handler, fd, self.WRITE)
+
+ def split_fd(self, fd):
+ return maybe_fileno(fd) or fd, fd
+
+ def update_handler(self, fd, state):
+ fd, _ = self.split_fd(fd)
+ #print('update handler: %r %r' % (fd, repr_flag(state)))
+ if state & self.WRITE and fd not in self.writers:
+ cb = self._callbacks[fd]
+ self.add_writer(fd, cb, fd, self.WRITE)
+ elif ((state & self.READ or state & self.ERROR) and
+ fd not in self.readers):
+ cb = self._callbacks[fd]
+ self.add_reader(fd, cb, fd, self.READ)
+
+ if not state & self.WRITE and fd in self.writers:
+ self.remove_writer(fd)
+ if not state & self.READ and fd in self.readers:
+ self.remove_reader(fd)
+
+ def remove_handler(self, fd):
+ fd = maybe_fileno(fd)
+ #print('remove handler: %r' %(fd,))
+ self._callbacks.pop(fd, None)
+ self.remove_reader(fd)
+ self.remove_writer(fd)
+
+ def time(self):
+ return time()
+
+ def add_timeout(self, next_timeout, handler):
+ self.call_later(next_timeout, handler)
diff --git a/kombu/connection.py b/kombu/connection.py
index f051dc8c..5c854e4b 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -823,6 +823,11 @@ class Connection(object):
@property
def is_evented(self):
return self.transport.implements.async
+
+ @property
+ def requires_hub(self):
+ return self.transport.requires_hub
+
BrokerConnection = Connection
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index d148f2e4..df1fd1b9 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -472,3 +472,6 @@ class Transport(virtual.Transport):
async=True,
exchange_type=frozenset(['direct']),
)
+
+ def driver_version(self):
+ return gettatr(boto, '__version__', None)
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index c1d68681..0e5c1f66 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -50,6 +50,8 @@ TRANSPORT_ALIASES = {
'redis': 'kombu.transport.redis:Transport',
'SQS': 'kombu.transport.SQS:Transport',
'sqs': 'kombu.transport.SQS:Transport',
+ 'nsq': 'kombu.transport.nsq.Transport',
+ 'nsqtcp': 'kombu.transport.nsq.TCPTransport',
'beanstalk': 'kombu.transport.beanstalk:Transport',
'mongodb': 'kombu.transport.mongodb:Transport',
'couchdb': 'kombu.transport.couchdb:Transport',
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 9b04dfb9..e724fbbe 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -112,6 +112,9 @@ class Transport(object):
__reader = None
+ #: Override this if the transport requires a specific Hub type.
+ requires_hub = None
+
implements = default_transport_capabilities.extend()
def __init__(self, client, **kwargs):
diff --git a/kombu/transport/nsq.py b/kombu/transport/nsq.py
new file mode 100644
index 00000000..7f3b9870
--- /dev/null
+++ b/kombu/transport/nsq.py
@@ -0,0 +1,173 @@
+from __future__ import absolute_import, unicode_literals
+
+from amqp import ensure_promise, promise
+
+from kombu.async import get_event_loop
+from kombu.async.tornado import TornadoHub
+from kombu.utils import cached_property
+from kombu.utils.json import dumps, loads
+
+from . import virtual
+
+try:
+ import nsq
+except ImportError: # pragma: no cover
+ nsq = None # noqa
+
+
+class Writer(nsq.Writer):
+
+ def __init__(self, hosts, on_connected=None, **kwargs):
+ self.on_connected = ensure_promise(on_connected)
+ super(Writer, self).__init__(hosts, **kwargs)
+
+ def _on_connection_ready(self, conn, **kwargs):
+ if conn.id not in self.conns:
+ print('ON CONNECTED!')
+ self.on_connected(conn, **kwargs)
+ super(Writer, self)._on_connection_ready(conn, **kwargs)
+
+
+class Channel(virtual.Channel):
+
+ _noack_queues = set()
+
+ def __init__(self, *args, **kwargs):
+ if nsq is None:
+ raise ImportError('pynsq is not installed')
+ super(Channel, self).__init__(*args, **kwargs)
+ self.hub = get_event_loop()
+ if self.hub is None:
+ self.hub = TornadoHub()
+ self._max_in_flight = 1000
+ self._readers = {}
+ self._writer = None
+ self._on_writer_connected = promise()
+ self._no_ack_queues = set()
+
+ def _put(self, queue, message, **kwargs):
+ p = promise(args=(queue, message), callback=self._message_sent)
+ self._on_writer_connected.then(
+ promise(self._send_message, (p, self.writer, queue, message)),
+ )
+ return self.hub.run_until_ready(p)
+
+ def _message_sent(self, *args, **kwargs):
+ print('message sent: %r %r' % (args, kwargs))
+
+ def create_writer(self):
+ conninfo = self.conninfo
+ return Writer(
+ ['localhost:4150'],
+ on_connected=self._on_writer_connected,
+ io_loop=self.hub,
+ )
+
+ def _send_message(self, p, writer, queue, message, conn):
+ writer.pub(queue, dumps(message), callback=p)
+ return p
+
+
+ def create_reader(self, queue):
+ return nsq.Reader(
+ topic=queue, channel=queue,
+ max_in_flight=self._max_in_flight,
+ nsqd_tcp_addresses=['localhost:4150'],
+ message_handler=promise(self.on_message, (queue,)),
+ #message_handler=self.on_message,
+ io_loop=self.hub,
+ lookupd_poll_interval=1,
+ )
+
+ def basic_qos(self, prefetch_size=0, prefetch_count=0,
+ apply_global=False):
+ self._max_in_flight = prefetch_count
+ super(Channel, self).basic_qos(
+ prefetch_size, prefetch_count, apply_global)
+ for reader in self._readers.values():
+ reader.set_max_in_flight(self._max_in_flight)
+
+ def basic_consume(self, queue, no_ack, *args, **kwargs):
+ if no_ack:
+ self._no_ack_queues.add(queue)
+ consumer_tag = super(Channel, self).basic_consume(
+ queue, no_ack, *args, **kwargs)
+ reader = self._readers[queue] = self.create_reader(queue)
+ return consumer_tag
+
+ def on_message(self, queue, message):
+ if message:
+ callbacks = self.connection._callbacks
+ payload = loads(message.body)
+ if queue in self._noack_queues:
+ message.finish()
+ else:
+ payload['properties']['delivery_info'].update({
+ 'nsq_message': message,
+ })
+ message.enable_async()
+ payload['properties']['delivery_tag'] = message.id
+ callbacks[queue](payload)
+ return True
+
+ def _get_message_by_tag(self, delivery_tag):
+ delivery_info = self.qos.get(delivery_tag).delivery_info
+ try:
+ return delivery_info['nsq_message']
+ except KeyError:
+ pass
+
+ def basic_ack(self, delivery_tag):
+ message = self._get_message_by_tag(delivery_tag)
+ if message:
+ message.finish()
+ super(Channel, self).basic_ack(delivery_tag)
+
+ def basic_reject(self, delivery_tag, requeue=False):
+ message = self._get_message_by_tag(delivery_tag)
+ message.requeue() if requeue else message.finish()
+ super(Channel, self).basic_reject(delivery_tag, requeue)
+
+ @property
+ def writer(self):
+ if self._writer is None:
+ self._writer = self.create_writer()
+ return self._writer
+
+ @property
+ def conninfo(self):
+ return self.connection.client
+
+class Transport(virtual.Transport):
+ Channel = Channel
+
+ using_lookupd = True
+ default_port = None
+ connection_errors = (
+ virtual.Transport.connection_errors +
+ ()
+ )
+ channel_errors = (
+ virtual.Transport.channel_errors,
+ ()
+ )
+ driver_type = 'nsq'
+ driver_name = 'nsq'
+
+ implements = virtual.Transport.implements.extend(
+ async=True,
+ exchange_type={'direct'},
+ )
+
+ requires_hub = TornadoHub
+
+ @cached_property
+ def hub(self):
+ return TornadoHub()
+
+ def driver_version(self):
+ return nsq.__version__
+
+
+class TCPTransport(virtual.Transport):
+ using_lookupd = False
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 0601ebe0..c991dd69 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -547,6 +547,7 @@ class Channel(AbstractChannel, base.StdChannel):
self._consumers.add(consumer_tag)
self._reset_cycle()
+ return consumer_tag
def basic_cancel(self, consumer_tag):
"""Cancel consumer by consumer tag."""
diff --git a/requirements/extras/nsq.txt b/requirements/extras/nsq.txt
new file mode 100644
index 00000000..b016f52b
--- /dev/null
+++ b/requirements/extras/nsq.txt
@@ -0,0 +1 @@
+pynsq
diff --git a/setup.py b/setup.py
index 2e3b413a..304a523c 100644
--- a/setup.py
+++ b/setup.py
@@ -127,6 +127,7 @@ tests_require = reqs('test3.txt' if PY3 else 'test.txt')
extras_require = extra['extras_require'] = {
'msgpack': extras('msgpack.txt'),
+ 'nsq': extras('nsq.txt'),
'yaml': extras('yaml.txt'),
'redis': extras('redis.txt'),
'mongodb': extras('mongodb.txt'),
diff --git a/t.py b/t.py
new file mode 100644
index 00000000..8bb58db3
--- /dev/null
+++ b/t.py
@@ -0,0 +1,43 @@
+import nsq
+import time
+import logging
+from nsq.async import AsyncConn
+from kombu.async.tornado import TornadoHub
+from kombu.five import monotonic
+from kombu.log import setup_logging
+
+setup_logging(loglevel=logging.INFO)
+
+hub = TornadoHub()
+
+count = 0
+finished = 0
+N = 10000
+
+writer = nsq.Writer(['localhost:4150'], io_loop=hub)
+
+def pub_message(i):
+ writer.pub('cstress', str(i) , finish_pub)
+
+def finish_pub(conn, data):
+ global finished
+ finished += 1
+ if finished >= N:
+ print('TIME: %r' % (monotonic() - time_start,))
+
+
+def on_it():
+ global count
+ global time_start
+ if count == 0:
+ time_start = monotonic()
+ if count > N + 1:
+ return
+ count += 1
+ pub_message(count)
+ hub.call_soon(on_it)
+
+hub.call_later(1, on_it)
+#hub.call_repeatedly(1, pub_message)
+hub.run_forever()
+