summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-08-16 14:41:40 +0100
committerAsk Solem <ask@celeryproject.org>2013-08-16 14:41:40 +0100
commit6987cd4180218484f9e07d4470b778c51e81a32c (patch)
treece580088ff11029030c7098b10ca1e2c43f4a52d
parentd4c0a69631a3546ff4079db30543f421931dd89f (diff)
downloadpy-amqp-6987cd4180218484f9e07d4470b778c51e81a32c.tar.gz
Adds support for RabbitMQ extension connection.blocked
-rw-r--r--amqp/connection.py24
-rw-r--r--amqp/exceptions.py4
2 files changed, 27 insertions, 1 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index 6768513..52db2b3 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -83,7 +83,8 @@ class Connection(AbstractChannel):
login_method='AMQPLAIN', login_response=None,
virtual_host='/', locale='en_US', client_properties=None,
ssl=False, connect_timeout=None, channel_max=None,
- frame_max=None, heartbeat=0, **kwargs):
+ frame_max=None, heartbeat=0, on_blocked=None,
+ on_unblocked=None, **kwargs):
"""Create a connection to the specified host, which should be
a 'host[:port]', such as 'localhost', or '1.2.3.4:5672'
(defaults to 'localhost', if a port is not specified then
@@ -121,6 +122,10 @@ class Connection(AbstractChannel):
self.frame_max = frame_max
self.heartbeat = heartbeat
+ # Callbacks
+ self.on_blocked = on_blocked
+ self.on_unblocked = on_unblocked
+
self._avail_channel_ids = array('H', range(self.channel_max, 0, -1))
# Properties set in the Start method
@@ -490,6 +495,17 @@ class Connection(AbstractChannel):
raise ConnectionError(reply_code, reply_text, (class_id, method_id))
+ def _blocked(self, args):
+ """RabbitMQ Extension."""
+ reason = args.read_shortstr()
+ if self.on_blocked:
+ return self.on_blocked(reason)
+ raise Blocked(reason)
+
+ def _unblocked(self, *args):
+ if self.on_unblocked:
+ return self.on_unblocked()
+
def _x_close_ok(self):
"""Confirm a connection close
@@ -759,6 +775,10 @@ class Connection(AbstractChannel):
if 'capabilities' not in client_properties:
client_properties['capabilities'] = {}
client_properties['capabilities']['consumer_cancel_notify'] = True
+ if self.server_capabilities.get('connection.blocked'):
+ if 'capabilities' not in client_properties:
+ client_properties['capabilities'] = {}
+ client_properties['capabilities']['connection.blocked'] = True
args = AMQPWriter()
args.write_table(client_properties)
args.write_shortstr(mechanism)
@@ -914,6 +934,8 @@ class Connection(AbstractChannel):
(10, 41): _open_ok,
(10, 50): _close,
(10, 51): _close_ok,
+ (10, 60): _blocked,
+ (10, 61): _unblocked,
}
_IMMEDIATE_METHODS = []
diff --git a/amqp/exceptions.py b/amqp/exceptions.py
index 7f786a9..6aa3bf0 100644
--- a/amqp/exceptions.py
+++ b/amqp/exceptions.py
@@ -55,6 +55,10 @@ class ConsumerCancel(ChannelError):
pass
+class Blocked(ConnectionError):
+ pass
+
+
METHOD_NAME_MAP = {
(10, 10): 'Connection.start',
(10, 11): 'Connection.start_ok',