diff options
Diffstat (limited to 'qpid')
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 145 |
1 files changed, 85 insertions, 60 deletions
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 617a4f0234..e7a89fb654 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -580,6 +580,8 @@ class Session: self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys() self.manageConnections = manageConnections self.agent_filter = [] # (vendor, product, instance) + self.agent_heartbeat_min = 10 # minimum agent heartbeat timeout interval + self.agent_heartbeat_miss = 3 # # of heartbeats to miss before deleting agent if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") @@ -1036,7 +1038,6 @@ class Session: timestamp = codec.read_uint64() if self.console != None and agent != None: self.console.heartbeat(agent, timestamp) - broker._ageAgents() def _handleSchemaResp(self, broker, codec, seq, agent_addr): @@ -1074,7 +1075,6 @@ class Session: ## if '_vendor' in values and values['_vendor'] == 'apache.org' and \ '_product' in values and values['_product'] == 'qpidd': - broker._ageAgents() return agent = broker.getAgent(1, agentName) @@ -1086,7 +1086,6 @@ class Session: agent.touch() if self.console and agent: self.console.heartbeat(agent, timestamp) - broker._ageAgents() def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): @@ -1928,6 +1927,7 @@ class Broker(Thread): self.rcv_queue = Queue() # for msg received on session self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq) Broker.nextSeq += 1 + self.last_age_check = time() # thread control self.setDaemon(True) @@ -2199,8 +2199,11 @@ class Broker(Thread): self.session.console.newAgent(agent) def _ageAgents(self): + if (time() - self.last_age_check) < self.session.agent_heartbeat_min: + # don't age if it's too soon + return + self.cv.acquire() try: - self.cv.acquire() to_delete = [] to_notify = [] for key in self.agents: @@ -2210,6 +2213,7 @@ class Broker(Thread): agent = self.agents.pop(key) agent.close() to_notify.append(agent) + self.last_age_check = time() finally: self.cv.release() if self.session.console: @@ -2414,6 +2418,10 @@ class Broker(Thread): except: pass + # mark agent as being alive + if agent: + agent.touch() + def _v2Cb(self, msg): """ Callback from session receive thread for V2 messages """ @@ -2466,6 +2474,7 @@ class Broker(Thread): if agent_addr in self.agents: agent = self.agents[agent_addr] agent._handleQmfV2Message(opcode, mp, ah, content) + agent.touch() # ignore failures as the session may be shutting down... try: @@ -2523,65 +2532,75 @@ class Broker(Thread): while not self.canceled: - item = self.rcv_queue.get() - - if self.canceled: - return - - if not self.connected: - # connection failure - while item: - # drain the queue + try: + item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min) + except Empty: + item = None + + while not self.canceled and item is not None: + + if not self.connected: + # connection failure + while item: + # drain the queue + try: + item = self.rcv_queue.get(block=False) + except Empty: + item = None + break + + # notify any waiters, and callback + self.cv.acquire() + try: + edata = self.error; + if self.syncInFlight: + self.cv.notify() + finally: + self.cv.release() + self.session._handleError(edata) + self.session._handleBrokerDisconnect(self) + + if not self.session.manageConnections: + return # do not attempt recovery + + # retry connection setup + delay = self.DELAY_MIN + while not self.canceled: + if self._tryToConnect(): + break + # managed connection - try again + count = 0 + while not self.canceled and count < delay: + sleep(1) + count += 1 + if delay < self.DELAY_MAX: + delay *= self.DELAY_FACTOR + + if self.canceled: + return + + # connection successful! + self.cv.acquire() try: - item = self.rcv_queue.get(block=False) - except Empty: - break + self.connected = True + finally: + self.cv.release() - # notify any waiters, and callback - self.cv.acquire() - try: - edata = self.error; - if self.syncInFlight: - self.cv.notify() - finally: - self.cv.release() - self.session._handleError(edata) - self.session._handleBrokerDisconnect(self) - - if not self.session.manageConnections: - return # do not attempt recovery - - # retry connection setup - delay = self.DELAY_MIN - while not self.canceled: - if self._tryToConnect(): - break - # managed connection - try again - count = 0 - while not self.canceled and count < delay: - sleep(1) - count += 1 - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR + self.session._handleBrokerConnect(self) - if self.canceled: - return + elif item.typecode == Broker._q_item.type_v1msg: + self._v1Dispatch(item.data) + elif item.typecode == Broker._q_item.type_v2msg: + self._v2Dispatch(item.data) - # connection successful! - self.cv.acquire() try: - self.connected = True - finally: - self.cv.release() - - self.session._handleBrokerConnect(self) - - elif item.typecode == Broker._q_item.type_v1msg: - self._v1Dispatch(item.data) - elif item.typecode == Broker._q_item.type_v2msg: - self._v2Dispatch(item.data) - + item = self.rcv_queue.get(block=False) + except Empty: + item = None + # queue drained, age the agents... + if not self.canceled: + self._ageAgents() #=================================================================================================== # Agent @@ -2598,7 +2617,12 @@ class Agent: self.agentBank = str(agentBank) self.label = label self.isV2 = isV2 - self.heartbeatInterval = interval + self.heartbeatInterval = 0 + if interval: + if interval < self.session.agent_heartbeat_min: + self.heartbeatInterval = self.session.agent_heartbeat_min + else: + self.heartbeatInterval = interval self.lock = Lock() self.seqMgr = self.session.seqMgr self.contextMap = {} @@ -2629,7 +2653,8 @@ class Agent: def touch(self): - self.lastSeenTime = time() + if self.heartbeatInterval: + self.lastSeenTime = time() def setEpoch(self, epoch): @@ -2647,7 +2672,7 @@ class Agent: def isOld(self): if self.heartbeatInterval == 0: return None - if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval): + if time() - self.lastSeenTime > (self.session.agent_heartbeat_miss * self.heartbeatInterval): return True return None |
