summaryrefslogtreecommitdiff
path: root/amqp/abstract_channel.py
blob: 6234b1965c9e22045a057412718972227c3ec37d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""Code common to Connection and Channel objects."""
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>)

import logging

from vine import ensure_promise, promise

from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
from .serialization import dumps, loads

__all__ = ('AbstractChannel',)

AMQP_LOGGER = logging.getLogger('amqp')

IGNORED_METHOD_DURING_CHANNEL_CLOSE = """\
Received method %s during closing channel %s. This method will be ignored\
"""


class AbstractChannel:
    """Superclass for Connection and Channel.

    The connection is treated as channel 0, then comes
    user-created channel objects.

    The subclasses must have a _METHOD_MAP class property, mapping
    between AMQP method signatures and Python methods.
    """

    def __init__(self, connection, channel_id):
        self.is_closing = False
        self.connection = connection
        self.channel_id = channel_id
        connection.channels[channel_id] = self
        self.method_queue = []  # Higher level queue for methods
        self.auto_decode = False
        self._pending = {}
        self._callbacks = {}

        self._setup_listeners()

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        self.close()

    def send_method(self, sig,
                    format=None, args=None, content=None,
                    wait=None, callback=None, returns_tuple=False):
        p = promise()
        conn = self.connection
        if conn is None:
            raise RecoverableConnectionError('connection already closed')
        args = dumps(format, args) if format else ''
        try:
            conn.frame_writer(1, self.channel_id, sig, args, content)
        except StopIteration:
            raise RecoverableConnectionError('connection already closed')

        # TODO temp: callback should be after write_method ... ;)
        if callback:
            p.then(callback)
        p()
        if wait:
            return self.wait(wait, returns_tuple=returns_tuple)
        return p

    def close(self):
        """Close this Channel or Connection."""
        raise NotImplementedError('Must be overridden in subclass')

    def wait(self, method, callback=None, timeout=None, returns_tuple=False):
        p = ensure_promise(callback)
        pending = self._pending
        prev_p = []
        if not isinstance(method, list):
            method = [method]

        for m in method:
            prev_p.append(pending.get(m))
            pending[m] = p

        try:
            while not p.ready:
                self.connection.drain_events(timeout=timeout)

            if p.value:
                args, kwargs = p.value
                args = args[1:]  # We are not returning method back
                return args if returns_tuple else (args and args[0])
        finally:
            for i, m in enumerate(method):
                if prev_p[i] is not None:
                    pending[m] = prev_p[i]
                else:
                    pending.pop(m, None)

    def dispatch_method(self, method_sig, payload, content):
        if self.is_closing and method_sig not in (
            self._ALLOWED_METHODS_WHEN_CLOSING
        ):
            # When channel.close() was called we must ignore all methods except
            # Channel.close and Channel.CloseOk
            AMQP_LOGGER.warning(
                IGNORED_METHOD_DURING_CHANNEL_CLOSE,
                method_sig, self.channel_id
            )
            return

        if content and \
                self.auto_decode and \
                hasattr(content, 'content_encoding'):
            try:
                content.body = content.body.decode(content.content_encoding)
            except Exception:
                pass

        try:
            amqp_method = self._METHODS[method_sig]
        except KeyError:
            raise AMQPNotImplementedError(
                f'Unknown AMQP method {method_sig!r}')

        try:
            listeners = [self._callbacks[method_sig]]
        except KeyError:
            listeners = []
        one_shot = None
        try:
            one_shot = self._pending.pop(method_sig)
        except KeyError:
            if not listeners:
                return

        args = []
        if amqp_method.args:
            args, _ = loads(amqp_method.args, payload, 4)
        if amqp_method.content:
            args.append(content)

        for listener in listeners:
            listener(*args)

        if one_shot:
            one_shot(method_sig, *args)

    #: Placeholder, the concrete implementations will have to
    #: supply their own versions of _METHOD_MAP
    _METHODS = {}