summaryrefslogtreecommitdiff
path: root/funtests/config.py
blob: 1b9542300f622985f7ad7c732d0c5dde94509042 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os

BROKER_HOST = os.environ.get('BROKER_HOST', 'localhost')
BROKER_PORT = int(os.environ.get('BROKER_PORT', 5672))
BROKER_VHOST = os.environ.get('BROKER_VHOST', '/')
BROKER_USER = os.environ.get('BROKER_USER', 'guest')
BROKER_PASSWORD = os.environ.get('BROKER_PASSWORD', 'guest')

from functools import partial
from unittest import TestCase
from uuid import uuid4


class BrokerCase(TestCase):

    def setUp(self):
        import librabbitmq
        self.cleanup_queues = set()
        self.Connection = partial(librabbitmq.Connection,
            host=BROKER_HOST,
            port=BROKER_PORT,
            userid=BROKER_USER,
            password=BROKER_PASSWORD,
            virtual_host=BROKER_VHOST,
        )
        self.mod = librabbitmq
        self.ConnectionError = self.mod.ConnectionError
        self.ChannelError = self.mod.ChannelError

    def tearDown(self):
        try:
            for name in self.cleanup_queues:
                with self.Connection() as conn:
                    with conn.channel() as chan:
                        try:
                            chan.queue_delete(name)
                        except (self.ConnectionError, self.ChannelError):
                            pass
        finally:
            self.cleanup_queues.clear()

    def uses_queue(self, name, register=True):
        register and self.cleanup_queues.add(name)
        return name

    def new_queue(self, register=True):
        return self.uses_queue('lrmqFUNTEST.%s' % (uuid4(), ), register)