summaryrefslogtreecommitdiff
path: root/t/unit/transport/test_sqlalchemy.py
blob: 70235b9abb9226bc26b0ea82174c2a434bce14b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import absolute_import, unicode_literals

import pytest
from case import patch, skip

from kombu import Connection


@skip.unless_module('sqlalchemy')
class test_SqlAlchemy:

    def test_url_parser(self):
        with patch('kombu.transport.sqlalchemy.Channel._open'):
            url = 'sqlalchemy+sqlite:///celerydb.sqlite'
            Connection(url).connect()

            url = 'sqla+sqlite:///celerydb.sqlite'
            Connection(url).connect()

            url = 'sqlb+sqlite:///celerydb.sqlite'
            with pytest.raises(KeyError):
                Connection(url).connect()

    def test_simple_queueing(self):
        conn = Connection('sqlalchemy+sqlite:///:memory:')
        conn.connect()
        try:
            channel = conn.channel()
            assert channel.queue_cls.__table__.name == 'kombu_queue'
            assert channel.message_cls.__table__.name == 'kombu_message'

            channel._put('celery', 'DATA_SIMPLE_QUEUEING')
            assert channel._get('celery') == 'DATA_SIMPLE_QUEUEING'
        finally:
            conn.release()

    def test_clone(self):
        hostname = 'sqlite:///celerydb.sqlite'
        x = Connection('+'.join(['sqla', hostname]))
        try:
            assert x.uri_prefix == 'sqla'
            assert x.hostname == hostname
            clone = x.clone()
            try:
                assert clone.hostname == hostname
                assert clone.uri_prefix == 'sqla'
            finally:
                clone.release()
        finally:
            x.release()