summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--.landscape.yml3
-rw-r--r--docs/reference/index.rst2
-rw-r--r--docs/reference/kombu.transport.SLMQ.rst1
-rw-r--r--docs/reference/kombu.transport.sqlalchemy.models.rst32
-rw-r--r--docs/reference/kombu.transport.sqlalchemy.rst25
-rw-r--r--kombu/transport/__init__.py2
-rw-r--r--kombu/transport/etcd.py1
-rw-r--r--kombu/transport/sqlalchemy/__init__.py164
-rw-r--r--kombu/transport/sqlalchemy/models.py67
-rw-r--r--requirements/extras/sqlalchemy.txt1
-rw-r--r--requirements/test-ci.txt1
-rw-r--r--setup.py1
-rw-r--r--t/integration/tests/test_sqla.py13
-rw-r--r--t/unit/transport/test_sqlalchemy.py50
15 files changed, 364 insertions, 1 deletions
diff --git a/.gitignore b/.gitignore
index 066482c3..74b71cd8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,3 +31,5 @@ htmlcov/
test.db
coverage.xml
venv/
+env
+.eggs
diff --git a/.landscape.yml b/.landscape.yml
new file mode 100644
index 00000000..f90444af
--- /dev/null
+++ b/.landscape.yml
@@ -0,0 +1,3 @@
+pylint:
+ disable:
+ - cyclic-import
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index b5cd6647..550b9d7a 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -48,6 +48,8 @@
kombu.transport.etcd
kombu.transport.zookeeper
kombu.transport.filesystem
+ kombu.transport.sqlalchemy
+ kombu.transport.sqlalchemy.models
kombu.transport.SQS
kombu.transport.SLMQ
kombu.transport.pyro
diff --git a/docs/reference/kombu.transport.SLMQ.rst b/docs/reference/kombu.transport.SLMQ.rst
index 95c57d7f..da385f86 100644
--- a/docs/reference/kombu.transport.SLMQ.rst
+++ b/docs/reference/kombu.transport.SLMQ.rst
@@ -2,6 +2,7 @@
SLMQ Transport - ``kombu.transport.SLMQ``
=============================================
+
.. currentmodule:: kombu.transport.SLMQ
.. automodule:: kombu.transport.SLMQ
diff --git a/docs/reference/kombu.transport.sqlalchemy.models.rst b/docs/reference/kombu.transport.sqlalchemy.models.rst
new file mode 100644
index 00000000..f4aa3ec3
--- /dev/null
+++ b/docs/reference/kombu.transport.sqlalchemy.models.rst
@@ -0,0 +1,32 @@
+=====================================================================
+ SQLAlchemy Transport Model - ``kombu.transport.sqlalchemy.models``
+=====================================================================
+
+
+.. currentmodule:: kombu.transport.sqlalchemy.models
+
+.. automodule:: kombu.transport.sqlalchemy.models
+
+ .. contents::
+ :local:
+
+ Models
+ ------
+
+ .. autoclass:: Queue
+
+ .. autoattribute:: Queue.id
+
+ .. autoattribute:: Queue.name
+
+ .. autoclass:: Message
+
+ .. autoattribute:: Message.id
+
+ .. autoattribute:: Message.visible
+
+ .. autoattribute:: Message.sent_at
+
+ .. autoattribute:: Message.payload
+
+ .. autoattribute:: Message.version
diff --git a/docs/reference/kombu.transport.sqlalchemy.rst b/docs/reference/kombu.transport.sqlalchemy.rst
new file mode 100644
index 00000000..848110d3
--- /dev/null
+++ b/docs/reference/kombu.transport.sqlalchemy.rst
@@ -0,0 +1,25 @@
+===========================================================
+ SQLAlchemy Transport Model - kombu.transport.sqlalchemy
+===========================================================
+
+
+.. currentmodule:: kombu.transport.sqlalchemy
+
+.. automodule:: kombu.transport.sqlalchemy
+
+ .. contents::
+ :local:
+
+ Transport
+ ---------
+
+ .. autoclass:: Transport
+ :members:
+ :undoc-members:
+
+ Channel
+ -------
+
+ .. autoclass:: Channel
+ :members:
+ :undoc-members:
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 7fb689ed..535a2165 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -28,6 +28,8 @@ TRANSPORT_ALIASES = {
'sqs': 'kombu.transport.SQS:Transport',
'mongodb': 'kombu.transport.mongodb:Transport',
'zookeeper': 'kombu.transport.zookeeper:Transport',
+ 'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
+ 'sqla': 'kombu.transport.sqlalchemy:Transport',
'SLMQ': 'kombu.transport.SLMQ.Transport',
'slmq': 'kombu.transport.SLMQ.Transport',
'filesystem': 'kombu.transport.filesystem:Transport',
diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py
index 1dcf2de0..025c1d7a 100644
--- a/kombu/transport/etcd.py
+++ b/kombu/transport/etcd.py
@@ -259,7 +259,6 @@ class Transport(virtual.Transport):
"""Return the version of the etcd library.
.. note::
-
python-etcd has no __version__. This is a workaround.
"""
try:
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
new file mode 100644
index 00000000..bcf9ed5b
--- /dev/null
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -0,0 +1,164 @@
+"""Kombu transport using SQLAlchemy as the message store."""
+# SQLAlchemy overrides != False to have special meaning and pep8 complains
+# flake8: noqa
+
+from __future__ import absolute_import, unicode_literals
+
+from json import loads, dumps
+
+from sqlalchemy import create_engine
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import sessionmaker
+
+from kombu.five import Empty
+from kombu.transport import virtual
+from kombu.utils import cached_property
+from kombu.utils.encoding import bytes_to_str
+from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
+ class_registry, metadata)
+
+
+VERSION = (1, 1, 0)
+__version__ = '.'.join(map(str, VERSION))
+
+
+class Channel(virtual.Channel):
+ """The channel class."""
+
+ _session = None
+ _engines = {} # engine cache
+
+ def __init__(self, connection, **kwargs):
+ self._configure_entity_tablenames(connection.client.transport_options)
+ super(Channel, self).__init__(connection, **kwargs)
+
+ def _configure_entity_tablenames(self, opts):
+ self.queue_tablename = opts.get('queue_tablename', 'kombu_queue')
+ self.message_tablename = opts.get('message_tablename', 'kombu_message')
+
+ #
+ # Define the model definitions. This registers the declarative
+ # classes with the active SQLAlchemy metadata object. This *must* be
+ # done prior to the ``create_engine`` call.
+ #
+ self.queue_cls and self.message_cls
+
+ def _engine_from_config(self):
+ conninfo = self.connection.client
+ transport_options = conninfo.transport_options.copy()
+ transport_options.pop('queue_tablename', None)
+ transport_options.pop('message_tablename', None)
+ return create_engine(conninfo.hostname, **transport_options)
+
+ def _open(self):
+ conninfo = self.connection.client
+ if conninfo.hostname not in self._engines:
+ engine = self._engine_from_config()
+ Session = sessionmaker(bind=engine)
+ metadata.create_all(engine)
+ self._engines[conninfo.hostname] = engine, Session
+ return self._engines[conninfo.hostname]
+
+ @property
+ def session(self):
+ if self._session is None:
+ _, Session = self._open()
+ self._session = Session()
+ return self._session
+
+ def _get_or_create(self, queue):
+ obj = self.session.query(self.queue_cls) \
+ .filter(self.queue_cls.name == queue).first()
+ if not obj:
+ obj = self.queue_cls(queue)
+ self.session.add(obj)
+ try:
+ self.session.commit()
+ except OperationalError:
+ self.session.rollback()
+ return obj
+
+ def _new_queue(self, queue, **kwargs):
+ self._get_or_create(queue)
+
+ def _put(self, queue, payload, **kwargs):
+ obj = self._get_or_create(queue)
+ message = self.message_cls(dumps(payload), obj)
+ self.session.add(message)
+ try:
+ self.session.commit()
+ except OperationalError:
+ self.session.rollback()
+
+ def _get(self, queue):
+ obj = self._get_or_create(queue)
+ if self.session.bind.name == 'sqlite':
+ self.session.execute('BEGIN IMMEDIATE TRANSACTION')
+ try:
+ msg = self.session.query(self.message_cls) \
+ .with_lockmode('update') \
+ .filter(self.message_cls.queue_id == obj.id) \
+ .filter(self.message_cls.visible != False) \
+ .order_by(self.message_cls.sent_at) \
+ .order_by(self.message_cls.id) \
+ .limit(1) \
+ .first()
+ if msg:
+ msg.visible = False
+ return loads(bytes_to_str(msg.payload))
+ raise Empty()
+ finally:
+ self.session.commit()
+
+ def _query_all(self, queue):
+ obj = self._get_or_create(queue)
+ return self.session.query(self.message_cls) \
+ .filter(self.message_cls.queue_id == obj.id)
+
+ def _purge(self, queue):
+ count = self._query_all(queue).delete(synchronize_session=False)
+ try:
+ self.session.commit()
+ except OperationalError:
+ self.session.rollback()
+ return count
+
+ def _size(self, queue):
+ return self._query_all(queue).count()
+
+ def _declarative_cls(self, name, base, ns):
+ if name in class_registry:
+ return class_registry[name]
+ return type(str(name), (base, ModelBase), ns)
+
+ @cached_property
+ def queue_cls(self):
+ return self._declarative_cls(
+ 'Queue',
+ QueueBase,
+ {'__tablename__': self.queue_tablename}
+ )
+
+ @cached_property
+ def message_cls(self):
+ return self._declarative_cls(
+ 'Message',
+ MessageBase,
+ {'__tablename__': self.message_tablename}
+ )
+
+
+class Transport(virtual.Transport):
+ """The transport class."""
+
+ Channel = Channel
+
+ can_parse_url = True
+ default_port = 0
+ driver_type = 'sql'
+ driver_name = 'sqlalchemy'
+ connection_errors = (OperationalError, )
+
+ def driver_version(self):
+ import sqlalchemy
+ return sqlalchemy.__version__
diff --git a/kombu/transport/sqlalchemy/models.py b/kombu/transport/sqlalchemy/models.py
new file mode 100644
index 00000000..5fd56c0c
--- /dev/null
+++ b/kombu/transport/sqlalchemy/models.py
@@ -0,0 +1,67 @@
+"""Kombu transport using SQLAlchemy as the message store."""
+from __future__ import absolute_import, unicode_literals
+
+import datetime
+
+from sqlalchemy import (Column, Integer, String, Text, DateTime,
+ Sequence, Boolean, ForeignKey, SmallInteger)
+from sqlalchemy.ext.declarative import declarative_base, declared_attr
+from sqlalchemy.orm import relation
+from sqlalchemy.schema import MetaData
+
+class_registry = {}
+metadata = MetaData()
+ModelBase = declarative_base(metadata=metadata, class_registry=class_registry)
+
+
+class Queue(object):
+ """The queue class."""
+
+ __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
+
+ id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True,
+ autoincrement=True)
+ name = Column(String(200), unique=True)
+
+ def __init__(self, name):
+ self.name = name
+
+ def __str__(self):
+ return '<Queue({self.name})>'.format(self=self)
+
+ @declared_attr
+ def messages(cls):
+ return relation('Message', backref='queue', lazy='noload')
+
+
+class Message(object):
+ """The message class."""
+
+ __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
+
+ id = Column(Integer, Sequence('message_id_sequence'),
+ primary_key=True, autoincrement=True)
+ visible = Column(Boolean, default=True, index=True)
+ sent_at = Column('timestamp', DateTime, nullable=True, index=True,
+ onupdate=datetime.datetime.now)
+ payload = Column(Text, nullable=False)
+ version = Column(SmallInteger, nullable=False, default=1)
+
+ __mapper_args__ = {'version_id_col': version}
+
+ def __init__(self, payload, queue):
+ self.payload = payload
+ self.queue = queue
+
+ def __str__(self):
+ return '<Message: {0.sent_at} {0.payload} {0.queue_id}>'.format(self)
+
+ @declared_attr
+ def queue_id(self):
+ return Column(
+ Integer,
+ ForeignKey(
+ '%s.id' % class_registry['Queue'].__tablename__,
+ name='FK_kombu_message_queue'
+ )
+ )
diff --git a/requirements/extras/sqlalchemy.txt b/requirements/extras/sqlalchemy.txt
new file mode 100644
index 00000000..39fb2bef
--- /dev/null
+++ b/requirements/extras/sqlalchemy.txt
@@ -0,0 +1 @@
+sqlalchemy
diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt
index f2318d5a..b3cd5427 100644
--- a/requirements/test-ci.txt
+++ b/requirements/test-ci.txt
@@ -4,3 +4,4 @@ redis
PyYAML
msgpack-python>0.2.0
-r extras/sqs.txt
+sqlalchemy
diff --git a/setup.py b/setup.py
index 0e49459f..a1b65879 100644
--- a/setup.py
+++ b/setup.py
@@ -141,6 +141,7 @@ setup(
'mongodb': extras('mongodb.txt'),
'sqs': extras('sqs.txt'),
'zookeeper': extras('zookeeper.txt'),
+ 'sqlalchemy': extras('sqlalchemy.txt'),
'librabbitmq': extras('librabbitmq.txt'),
'pyro': extras('pyro.txt'),
'slmq': extras('slmq.txt'),
diff --git a/t/integration/tests/test_sqla.py b/t/integration/tests/test_sqla.py
new file mode 100644
index 00000000..f145c07e
--- /dev/null
+++ b/t/integration/tests/test_sqla.py
@@ -0,0 +1,13 @@
+from __future__ import absolute_import, unicode_literals
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_module('sqlalchemy')
+class test_sqla(transport.TransportCase):
+ transport = 'sqlalchemy'
+ prefix = 'sqlalchemy'
+ event_loop_max = 10
+ connection_options = {'hostname': 'sqla+sqlite://'}
diff --git a/t/unit/transport/test_sqlalchemy.py b/t/unit/transport/test_sqlalchemy.py
new file mode 100644
index 00000000..70235b9a
--- /dev/null
+++ b/t/unit/transport/test_sqlalchemy.py
@@ -0,0 +1,50 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+from case import patch, skip
+
+from kombu import Connection
+
+
+@skip.unless_module('sqlalchemy')
+class test_SqlAlchemy:
+
+ def test_url_parser(self):
+ with patch('kombu.transport.sqlalchemy.Channel._open'):
+ url = 'sqlalchemy+sqlite:///celerydb.sqlite'
+ Connection(url).connect()
+
+ url = 'sqla+sqlite:///celerydb.sqlite'
+ Connection(url).connect()
+
+ url = 'sqlb+sqlite:///celerydb.sqlite'
+ with pytest.raises(KeyError):
+ Connection(url).connect()
+
+ def test_simple_queueing(self):
+ conn = Connection('sqlalchemy+sqlite:///:memory:')
+ conn.connect()
+ try:
+ channel = conn.channel()
+ assert channel.queue_cls.__table__.name == 'kombu_queue'
+ assert channel.message_cls.__table__.name == 'kombu_message'
+
+ channel._put('celery', 'DATA_SIMPLE_QUEUEING')
+ assert channel._get('celery') == 'DATA_SIMPLE_QUEUEING'
+ finally:
+ conn.release()
+
+ def test_clone(self):
+ hostname = 'sqlite:///celerydb.sqlite'
+ x = Connection('+'.join(['sqla', hostname]))
+ try:
+ assert x.uri_prefix == 'sqla'
+ assert x.hostname == hostname
+ clone = x.clone()
+ try:
+ assert clone.hostname == hostname
+ assert clone.uri_prefix == 'sqla'
+ finally:
+ clone.release()
+ finally:
+ x.release()