summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-08-18 18:05:10 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-08-18 18:05:10 +0000
commitf5236e04ddf587a13f989f8bb33cb042fbdf3140 (patch)
tree32a15aab0789ae71c1f2498d89b7298e55d5d729
parent1f1f8b979a199098008836ab532caa5da1dc7b5c (diff)
downloadqpid-python-f5236e04ddf587a13f989f8bb33cb042fbdf3140.tar.gz
QPID-2663: prevent slow consoles from causing agent disconnects due to full topic queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@986828 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py67
1 files changed, 39 insertions, 28 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index e3de7f8ed2..0a256d0bb5 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -678,8 +678,8 @@ class Session:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
for v2key in v2keys:
- # data indications should arrive on the lo priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+ # data indications should arrive on the unsolicited indication queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindClass(self, pname, cname=None):
@@ -701,8 +701,8 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
- # data indications should arrive on the lo priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+ # data indications should arrive on the unsolicited indication queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindClassKey(self, classKey):
@@ -731,8 +731,8 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
- # event indications should arrive on the lo priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+ # event indications should arrive on the unsolicited indication queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindEventKey(self, eventKey):
""" Request events for a particular class by class key. Only valid if
@@ -757,8 +757,8 @@ class Session:
for broker in self.brokers:
if broker.isConnected():
if broker.brokerSupportsV2:
- # heartbeats should arrive on the hi priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hi, binding_key=v2key)
+ # heartbeats should arrive on the heartbeat queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key)
def getAgents(self, broker=None):
""" Get a list of currently known agents """
@@ -2206,6 +2206,10 @@ class Broker(Thread):
sock.settimeout(oldTimeout)
self.conn.aborted = oldAborted
+ # prevent topic queues from filling up (and causing the agents to
+ # disconnect) by discarding the oldest queued messages when full.
+ topic_queue_options = {"qpid.policy_type":"ring"}
+
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.timeout = self.SYNC_TIME
@@ -2222,7 +2226,9 @@ class Broker(Thread):
self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)
self.topicName = "topic-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
+ self.amqpSession.queue_declare(queue=self.topicName, exclusive=True,
+ auto_delete=True,
+ arguments=topic_queue_options)
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
@@ -2252,13 +2258,18 @@ class Broker(Thread):
if self.brokerSupportsV2:
# set up 3 queues:
# 1 direct queue - for responses destined to this console.
- # 2 topic queues - one for heartbeats (hi priority), one for all other indications.
+ # 2 topic queues - one for heartbeats (hb), one for unsolicited data
+ # and event indications (ui).
self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
- self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True)
- self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True)
+ self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui,
+ exclusive=True, auto_delete=True,
+ arguments=topic_queue_options)
+ self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb,
+ exclusive=True, auto_delete=True,
+ arguments=topic_queue_options)
self.amqpSession.exchange_bind(exchange="qmf.default.direct",
queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
@@ -2272,22 +2283,22 @@ class Broker(Thread):
self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)
- self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo",
+ self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window)
- self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25)
+ self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25)
- self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi",
+ self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window)
- self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100)
+ self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100)
codec = Codec()
self._setHeader(codec, 'B')
@@ -2426,8 +2437,8 @@ class Broker(Thread):
self.amqpSession.incoming("tdest").stop()
if self.brokerSupportsV2:
self.amqpSession.incoming("v2dest").stop()
- self.amqpSession.incoming("v2TopicLo").stop()
- self.amqpSession.incoming("v2TopicHi").stop()
+ self.amqpSession.incoming("v2TopicUI").stop()
+ self.amqpSession.incoming("v2TopicHB").stop()
self.amqpSession.close()
self.conn.close()
self.connected = False
@@ -2472,11 +2483,11 @@ class Broker(Thread):
for key in self.session.v2BindingKeyList:
if key.startswith("agent.ind.heartbeat"):
self.amqpSession.exchange_bind(exchange="qmf.default.topic",
- queue=self.v2_topic_queue_hi,
+ queue=self.v2_topic_queue_hb,
binding_key=key)
else:
self.amqpSession.exchange_bind(exchange="qmf.default.topic",
- queue=self.v2_topic_queue_lo,
+ queue=self.v2_topic_queue_ui,
binding_key=key)
# solicit an agent locate now, after we bind to agent.ind.data,
# because the agent locate will cause the agent to publish a