diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-31 22:52:13 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-31 22:52:13 +0000 |
| commit | 4bc0aace4fedb76a055fa746fb3999edb106212c (patch) | |
| tree | aef9dd346fca0df857d1003a851ff9f4b0c04dc0 | |
| parent | 30ff4dcc85eaf5a2ea52cad9d965086c8062a4ce (diff) | |
| download | qpid-python-4bc0aace4fedb76a055fa746fb3999edb106212c.tar.gz | |
QPID-2261: update addressing, fix subscription refresh, remove old comments
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@929740 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/agent.py | 16 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/common.py | 3 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/console.py | 103 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py | 12 |
4 files changed, 46 insertions, 88 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py index cb0cc36ab1..4ec00bd288 100644 --- a/qpid/extras/qmf/src/py/qmf2/agent.py +++ b/qpid/extras/qmf/src/py/qmf2/agent.py @@ -193,9 +193,9 @@ class Agent(Thread): # for messages directly addressed to me self._direct_receiver = self._session.receiver(str(self._address) + ";{create:always," - " node-properties:" + " node:" " {type:topic," - " x-properties:" + " x-declare:" " {type:direct}}}", capacity=self._capacity) trace.debug("my direct addr=%s" % self._direct_receiver.source) @@ -203,9 +203,9 @@ class Agent(Thread): # for sending directly addressed messages. self._direct_sender = self._session.sender(str(self._address.get_node()) + ";{create:always," - " node-properties:" + " node:" " {type:topic," - " x-properties:" + " x-declare:" " {type:direct}}}") trace.debug("my default direct send addr=%s" % self._direct_sender.target) @@ -214,7 +214,7 @@ class Agent(Thread): self._domain) self._topic_receiver = self._session.receiver(str(default_addr) + ";{create:always," - " node-properties:" + " node:" " {type:topic}}", capacity=self._capacity) trace.debug("console.ind addr=%s" % self._topic_receiver.source) @@ -224,7 +224,7 @@ class Agent(Thread): self._domain) self._topic_sender = self._session.sender(str(ind_addr) + ";{create:always," - " node-properties:" + " node:" " {type:topic}}") trace.debug("agent.ind addr=%s" % self._topic_sender.target) @@ -635,7 +635,7 @@ class Agent(Thread): self._handleMethodReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.subscribe_req: self._handleSubscribeReqMsg(msg, cmap, props, version, _direct) - elif opcode == OpCode.subscribe_refresh_req: + elif opcode == OpCode.subscribe_refresh_ind: self._handleResubscribeReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.subscribe_cancel_ind: self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct) @@ -905,7 +905,7 @@ class Agent(Thread): "_duration": new_duration} m = Message(id=QMF_APP_ID, properties={"method":"response", - "qmf.opcode":OpCode.subscribe_refresh_rsp}, + "qmf.opcode":OpCode.subscribe_rsp}, correlation_id = msg.correlation_id, content=sr_map) self._send_reply(m, msg.reply_to) diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py index 7ca7bb5947..2e5367f54f 100644 --- a/qpid/extras/qmf/src/py/qmf2/common.py +++ b/qpid/extras/qmf/src/py/qmf2/common.py @@ -57,7 +57,7 @@ class OpCode(object): agent_locate_req = "_agent_locate_request" subscribe_req = "_subscribe_request" subscribe_cancel_ind = "_subscribe_cancel_indication" - subscribe_refresh_req = "_subscribe_refresh_indication" + subscribe_refresh_ind = "_subscribe_refresh_indication" query_req = "_query_request" method_req = "_method_request" @@ -67,7 +67,6 @@ class OpCode(object): agent_heartbeat_ind = "_agent_heartbeat_indication" query_rsp = "_query_response" subscribe_rsp = "_subscribe_response" - subscribe_refresh_rsp = "_subscribe_refresh_response" data_ind = "_data_indication" method_rsp = "_method_response" diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py index ec85aec0de..9227835b3f 100644 --- a/qpid/extras/qmf/src/py/qmf2/console.py +++ b/qpid/extras/qmf/src/py/qmf2/console.py @@ -406,16 +406,16 @@ class _SubscriptionMailbox(_AsyncMailbox): return False return True - def resubscribe(self, duration): + def resubscribe(self): agent = self.console.get_agent(self.agent_name) if not agent: - log.warning("resubscribed failed - unknown agent '%s'" % - self.agent_name) + log.warning("resubscribed failed - unknown agent '%s'", + self.agent_name) return False try: - trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name) + trace.debug("Sending resubscribe to Agent %s", self.agent_name) agent._send_resubscribe_req(self.get_address(), - self.agent_subscription_id, duration) + self.agent_subscription_id) except SendError, e: log.error(str(e)) return False @@ -425,8 +425,7 @@ class _SubscriptionMailbox(_AsyncMailbox): """ """ opcode = msg.properties.get("qmf.opcode") - if (opcode == OpCode.subscribe_rsp or - opcode == OpCode.subscribe_refresh_rsp): + if (opcode == OpCode.subscribe_rsp): error = msg.content.get("_error") if error: @@ -519,8 +518,6 @@ class _AsyncSubscriptionMailbox(_SubscriptionMailbox): agent, duration, interval) self.subscribe_pending = False - self.resubscribe_pending = False - def subscribe(self, query, reply_timeout): if super(_AsyncSubscriptionMailbox, self).subscribe(query): @@ -529,34 +526,18 @@ class _AsyncSubscriptionMailbox(_SubscriptionMailbox): return True return False - def resubscribe(self, duration, reply_timeout): - if super(_AsyncSubscriptionMailbox, self).resubscribe(duration): - self.resubscribe_pending = True - self.reset_timeout(reply_timeout) - return True - return False - def deliver(self, msg): """ """ super(_AsyncSubscriptionMailbox, self).deliver(msg) sp = self.fetch(0) - if sp: - # if the message was a reply to a subscribe or - # re-subscribe, then we get here. - if self.subscribe_pending: - wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, - self.context, sp) - else: - wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, - self.context, sp) - - self.subscribe_pending = False - self.resubscribe_pending = False - + if sp and self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, sp) self.console._work_q.put(wi) self.console._work_q_put = True + self.subscribe_pending = False + if not sp.succeeded(): self.destroy() @@ -565,11 +546,9 @@ class _AsyncSubscriptionMailbox(_SubscriptionMailbox): """ Either the subscription expired, or a request timedout. """ if self.subscribe_pending: - wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, - self.context, None) - elif self.resubscribe_pending: - wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, - self.context, None) + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, None) + self.console._work_q.put(wi) + self.console._work_q_put = True self.destroy() @@ -928,17 +907,14 @@ class Agent(object): def _send_resubscribe_req(self, correlation_id, - subscription_id, - _lifetime=None): + subscription_id): """ """ sr_map = {"_subscription_id":subscription_id} - if _lifetime is not None: - sr_map["_duration"] = _lifetime msg = Message(id=QMF_APP_ID, properties={"method":"request", - "qmf.opcode":OpCode.subscribe_refresh_req}, + "qmf.opcode":OpCode.subscribe_refresh_ind}, content=sr_map) self._send_msg(msg, correlation_id) @@ -1101,18 +1077,18 @@ class Console(Thread): # for messages directly addressed to me self._direct_recvr = self._session.receiver(str(self._address) + ";{create:always," - " node-properties:" + " node:" " {type:topic," - " x-properties:" + " x-declare:" " {type:direct}}}", capacity=1) trace.debug("my direct addr=%s" % self._direct_recvr.source) self._direct_sender = self._session.sender(str(self._address.get_node()) + ";{create:always," - " node-properties:" + " node:" " {type:topic," - " x-properties:" + " x-declare:" " {type:direct}}}") trace.debug("my direct sender=%s" % self._direct_sender.target) @@ -1121,7 +1097,7 @@ class Console(Thread): self._domain) self._topic_recvr = self._session.receiver(str(default_addr) + ";{create:always," - " node-properties:{type:topic}}", + " node:{type:topic}}", capacity=1) trace.debug("default topic recv addr=%s" % self._topic_recvr.source) @@ -1130,7 +1106,7 @@ class Console(Thread): topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) self._topic_sender = self._session.sender(str(topic_addr) + ";{create:always," - " node-properties:{type:topic}}") + " node:{type:topic}}") trace.debug("default topic send addr=%s" % self._topic_sender.target) # @@ -1410,10 +1386,10 @@ class Console(Thread): return None if isinstance(mbox, _AsyncSubscriptionMailbox): - return mbox.resubscribe(_duration, _timeout) + return mbox.resubscribe() else: # synchronous - wait for reply - if not mbox.resubscribe(_duration): + if not mbox.resubscribe(): # @todo ???? mbox.destroy() return None @@ -1641,11 +1617,11 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ - trace.debug( "Message received from Agent! [%s]" % msg ) + trace.debug( "Message received from Agent! [%s]", msg ) opcode = msg.properties.get("qmf.opcode") if not opcode: - log.error("Ignoring unrecognized message '%s'" % msg) + log.error("Ignoring unrecognized message '%s'", msg) return version = 2 # @todo: fix me @@ -1659,23 +1635,14 @@ class Console(Thread): self._handle_agent_ind_msg( msg, cmap, version, _direct ) elif opcode == OpCode.agent_locate_rsp: self._handle_agent_ind_msg( msg, cmap, version, _direct ) - elif opcode == OpCode.query_rsp: - self._handle_response_msg(msg, cmap, version, _direct) - elif opcode == OpCode.subscribe_rsp: - self._handle_response_msg(msg, cmap, version, _direct) - elif opcode == OpCode.subscribe_refresh_rsp: - self._handle_response_msg(msg, cmap, version, _direct) - elif opcode == OpCode.method_rsp: + elif msg.correlation_id: self._handle_response_msg(msg, cmap, version, _direct) elif opcode == OpCode.data_ind: - if msg.correlation_id: - self._handle_response_msg(msg, cmap, version, _direct) - else: - self._handle_indication_msg(msg, cmap, version, _direct) + self._handle_indication_msg(msg, cmap, version, _direct) elif opcode == OpCode.noop: trace.debug("No-op msg received.") else: - log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) + log.warning("Ignoring message with unrecognized 'opcode' value: '%s'", opcode) def _handle_agent_ind_msg(self, msg, cmap, version, direct): @@ -1742,12 +1709,12 @@ class Console(Thread): """ Process a received data-ind message. """ - trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) + trace.debug("%s _handle_response_msg '%s'", self._name, str(msg)) mbox = self._get_mailbox(msg.correlation_id) if not mbox: - log.warning("Response msg received with unknown correlation_id" - " msg='%s'" % str(msg)) + log.warning("%s Response msg received with unknown correlation_id" + " msg='%s'", self._name, str(msg)) return # wake up all waiters @@ -1764,12 +1731,12 @@ class Console(Thread): content_type = msg.properties.get("qmf.content") if (content_type != ContentType.event or not isinstance(msg.content, type([]))): - log.warning("Bad event indication message received: '%s'" % msg) + log.warning("Bad event indication message received: '%s'", msg) return emap = msg.content[0] if not isinstance(emap, type({})): - trace.debug("Invalid event body in indication message: '%s'" % msg) + trace.debug("Invalid event body in indication message: '%s'", msg) return agent = None @@ -1877,9 +1844,9 @@ class Console(Thread): try: agent._sender = self._session.sender(str(agent._address) + ";{create:always," - " node-properties:" + " node:" " {type:topic," - " x-properties:" + " x-declare:" " {type:direct}}}") except: log.warning("Unable to create sender for %s" % name) diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py index 1f73865f30..1f8a31dc22 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py @@ -506,7 +506,7 @@ class BaseTest(unittest.TestCase): if r_count == 3: rp = self.console.refresh_subscription(sp.get_subscription_id()) - self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams)) + self.assertTrue(rp) wi = self.console.get_next_workitem(timeout=0) @@ -695,7 +695,6 @@ class BaseTest(unittest.TestCase): r_count = 0 i_count = 0 sp = None - rp = None while self.notifier.wait_for_work(4): wi = self.console.get_next_workitem(timeout=0) while wi is not None: @@ -706,12 +705,6 @@ class BaseTest(unittest.TestCase): self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) self.assertTrue(sp.succeeded()) self.assertTrue(sp.get_error() == None) - elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE: - self.assertTrue(wi.get_handle() == "my-handle") - rp = wi.get_params() - self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams)) - self.assertTrue(rp.succeeded()) - self.assertTrue(rp.get_error() == None) else: self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) @@ -745,7 +738,7 @@ class BaseTest(unittest.TestCase): wi = self.console.get_next_workitem(timeout=0) # expect 5 publish per subscription, more if refreshed - self.assertTrue(sp is not None and rp is not None) + self.assertTrue(sp is not None) self.assertTrue(i_count > 5) self.console.destroy(10) @@ -785,7 +778,6 @@ class BaseTest(unittest.TestCase): r_count = 0 sp = None - rp = None while self.notifier.wait_for_work(4): wi = self.console.get_next_workitem(timeout=0) while wi is not None: |
