diff options
author | Ask Solem <ask@celeryproject.org> | 2013-08-16 14:41:40 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-08-16 14:41:40 +0100 |
commit | 6987cd4180218484f9e07d4470b778c51e81a32c (patch) | |
tree | ce580088ff11029030c7098b10ca1e2c43f4a52d | |
parent | d4c0a69631a3546ff4079db30543f421931dd89f (diff) | |
download | py-amqp-6987cd4180218484f9e07d4470b778c51e81a32c.tar.gz |
Adds support for RabbitMQ extension connection.blocked
-rw-r--r-- | amqp/connection.py | 24 | ||||
-rw-r--r-- | amqp/exceptions.py | 4 |
2 files changed, 27 insertions, 1 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index 6768513..52db2b3 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -83,7 +83,8 @@ class Connection(AbstractChannel): login_method='AMQPLAIN', login_response=None, virtual_host='/', locale='en_US', client_properties=None, ssl=False, connect_timeout=None, channel_max=None, - frame_max=None, heartbeat=0, **kwargs): + frame_max=None, heartbeat=0, on_blocked=None, + on_unblocked=None, **kwargs): """Create a connection to the specified host, which should be a 'host[:port]', such as 'localhost', or '1.2.3.4:5672' (defaults to 'localhost', if a port is not specified then @@ -121,6 +122,10 @@ class Connection(AbstractChannel): self.frame_max = frame_max self.heartbeat = heartbeat + # Callbacks + self.on_blocked = on_blocked + self.on_unblocked = on_unblocked + self._avail_channel_ids = array('H', range(self.channel_max, 0, -1)) # Properties set in the Start method @@ -490,6 +495,17 @@ class Connection(AbstractChannel): raise ConnectionError(reply_code, reply_text, (class_id, method_id)) + def _blocked(self, args): + """RabbitMQ Extension.""" + reason = args.read_shortstr() + if self.on_blocked: + return self.on_blocked(reason) + raise Blocked(reason) + + def _unblocked(self, *args): + if self.on_unblocked: + return self.on_unblocked() + def _x_close_ok(self): """Confirm a connection close @@ -759,6 +775,10 @@ class Connection(AbstractChannel): if 'capabilities' not in client_properties: client_properties['capabilities'] = {} client_properties['capabilities']['consumer_cancel_notify'] = True + if self.server_capabilities.get('connection.blocked'): + if 'capabilities' not in client_properties: + client_properties['capabilities'] = {} + client_properties['capabilities']['connection.blocked'] = True args = AMQPWriter() args.write_table(client_properties) args.write_shortstr(mechanism) @@ -914,6 +934,8 @@ class Connection(AbstractChannel): (10, 41): _open_ok, (10, 50): _close, (10, 51): _close_ok, + (10, 60): _blocked, + (10, 61): _unblocked, } _IMMEDIATE_METHODS = [] diff --git a/amqp/exceptions.py b/amqp/exceptions.py index 7f786a9..6aa3bf0 100644 --- a/amqp/exceptions.py +++ b/amqp/exceptions.py @@ -55,6 +55,10 @@ class ConsumerCancel(ChannelError): pass +class Blocked(ConnectionError): + pass + + METHOD_NAME_MAP = { (10, 10): 'Connection.start', (10, 11): 'Connection.start_ok', |