diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-08-18 18:05:10 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-08-18 18:05:10 +0000 |
| commit | f5236e04ddf587a13f989f8bb33cb042fbdf3140 (patch) | |
| tree | 32a15aab0789ae71c1f2498d89b7298e55d5d729 | |
| parent | 1f1f8b979a199098008836ab532caa5da1dc7b5c (diff) | |
| download | qpid-python-f5236e04ddf587a13f989f8bb33cb042fbdf3140.tar.gz | |
QPID-2663: prevent slow consoles from causing agent disconnects due to full topic queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@986828 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 67 |
1 files changed, 39 insertions, 28 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index e3de7f8ed2..0a256d0bb5 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -678,8 +678,8 @@ class Session: broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) if broker.brokerSupportsV2: for v2key in v2keys: - # data indications should arrive on the lo priority queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key) + # data indications should arrive on the unsolicited indication queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) def bindClass(self, pname, cname=None): @@ -701,8 +701,8 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) if broker.brokerSupportsV2: - # data indications should arrive on the lo priority queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key) + # data indications should arrive on the unsolicited indication queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) def bindClassKey(self, classKey): @@ -731,8 +731,8 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) if broker.brokerSupportsV2: - # event indications should arrive on the lo priority queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key) + # event indications should arrive on the unsolicited indication queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key) def bindEventKey(self, eventKey): """ Request events for a particular class by class key. Only valid if @@ -757,8 +757,8 @@ class Session: for broker in self.brokers: if broker.isConnected(): if broker.brokerSupportsV2: - # heartbeats should arrive on the hi priority queue - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hi, binding_key=v2key) + # heartbeats should arrive on the heartbeat queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key) def getAgents(self, broker=None): """ Get a list of currently known agents """ @@ -2206,6 +2206,10 @@ class Broker(Thread): sock.settimeout(oldTimeout) self.conn.aborted = oldAborted + # prevent topic queues from filling up (and causing the agents to + # disconnect) by discarding the oldest queued messages when full. + topic_queue_options = {"qpid.policy_type":"ring"} + self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) self.amqpSession.timeout = self.SYNC_TIME @@ -2222,7 +2226,9 @@ class Broker(Thread): self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200) self.topicName = "topic-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) + self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, + auto_delete=True, + arguments=topic_queue_options) self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) @@ -2252,13 +2258,18 @@ class Broker(Thread): if self.brokerSupportsV2: # set up 3 queues: # 1 direct queue - for responses destined to this console. - # 2 topic queues - one for heartbeats (hi priority), one for all other indications. + # 2 topic queues - one for heartbeats (hb), one for unsolicited data + # and event indications (ui). self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True) - self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True) - self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True) + self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui, + exclusive=True, auto_delete=True, + arguments=topic_queue_options) + self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb, + exclusive=True, auto_delete=True, + arguments=topic_queue_options) self.amqpSession.exchange_bind(exchange="qmf.default.direct", queue=self.v2_direct_queue, binding_key=self.v2_direct_queue) @@ -2272,22 +2283,22 @@ class Broker(Thread): self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50) - self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo", + self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25) + self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25) - self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi", + self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window) - self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100) + self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100) codec = Codec() self._setHeader(codec, 'B') @@ -2426,8 +2437,8 @@ class Broker(Thread): self.amqpSession.incoming("tdest").stop() if self.brokerSupportsV2: self.amqpSession.incoming("v2dest").stop() - self.amqpSession.incoming("v2TopicLo").stop() - self.amqpSession.incoming("v2TopicHi").stop() + self.amqpSession.incoming("v2TopicUI").stop() + self.amqpSession.incoming("v2TopicHB").stop() self.amqpSession.close() self.conn.close() self.connected = False @@ -2472,11 +2483,11 @@ class Broker(Thread): for key in self.session.v2BindingKeyList: if key.startswith("agent.ind.heartbeat"): self.amqpSession.exchange_bind(exchange="qmf.default.topic", - queue=self.v2_topic_queue_hi, + queue=self.v2_topic_queue_hb, binding_key=key) else: self.amqpSession.exchange_bind(exchange="qmf.default.topic", - queue=self.v2_topic_queue_lo, + queue=self.v2_topic_queue_ui, binding_key=key) # solicit an agent locate now, after we bind to agent.ind.data, # because the agent locate will cause the agent to publish a |
