summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-31 22:52:13 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-31 22:52:13 +0000
commit4bc0aace4fedb76a055fa746fb3999edb106212c (patch)
treeaef9dd346fca0df857d1003a851ff9f4b0c04dc0
parent30ff4dcc85eaf5a2ea52cad9d965086c8062a4ce (diff)
downloadqpid-python-4bc0aace4fedb76a055fa746fb3999edb106212c.tar.gz
QPID-2261: update addressing, fix subscription refresh, remove old comments
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@929740 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/qmf/src/py/qmf2/agent.py16
-rw-r--r--qpid/extras/qmf/src/py/qmf2/common.py3
-rw-r--r--qpid/extras/qmf/src/py/qmf2/console.py103
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py12
4 files changed, 46 insertions, 88 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py
index cb0cc36ab1..4ec00bd288 100644
--- a/qpid/extras/qmf/src/py/qmf2/agent.py
+++ b/qpid/extras/qmf/src/py/qmf2/agent.py
@@ -193,9 +193,9 @@ class Agent(Thread):
# for messages directly addressed to me
self._direct_receiver = self._session.receiver(str(self._address) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic,"
- " x-properties:"
+ " x-declare:"
" {type:direct}}}",
capacity=self._capacity)
trace.debug("my direct addr=%s" % self._direct_receiver.source)
@@ -203,9 +203,9 @@ class Agent(Thread):
# for sending directly addressed messages.
self._direct_sender = self._session.sender(str(self._address.get_node()) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic,"
- " x-properties:"
+ " x-declare:"
" {type:direct}}}")
trace.debug("my default direct send addr=%s" % self._direct_sender.target)
@@ -214,7 +214,7 @@ class Agent(Thread):
self._domain)
self._topic_receiver = self._session.receiver(str(default_addr) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic}}",
capacity=self._capacity)
trace.debug("console.ind addr=%s" % self._topic_receiver.source)
@@ -224,7 +224,7 @@ class Agent(Thread):
self._domain)
self._topic_sender = self._session.sender(str(ind_addr) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic}}")
trace.debug("agent.ind addr=%s" % self._topic_sender.target)
@@ -635,7 +635,7 @@ class Agent(Thread):
self._handleMethodReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.subscribe_req:
self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
- elif opcode == OpCode.subscribe_refresh_req:
+ elif opcode == OpCode.subscribe_refresh_ind:
self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.subscribe_cancel_ind:
self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
@@ -905,7 +905,7 @@ class Agent(Thread):
"_duration": new_duration}
m = Message(id=QMF_APP_ID,
properties={"method":"response",
- "qmf.opcode":OpCode.subscribe_refresh_rsp},
+ "qmf.opcode":OpCode.subscribe_rsp},
correlation_id = msg.correlation_id,
content=sr_map)
self._send_reply(m, msg.reply_to)
diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py
index 7ca7bb5947..2e5367f54f 100644
--- a/qpid/extras/qmf/src/py/qmf2/common.py
+++ b/qpid/extras/qmf/src/py/qmf2/common.py
@@ -57,7 +57,7 @@ class OpCode(object):
agent_locate_req = "_agent_locate_request"
subscribe_req = "_subscribe_request"
subscribe_cancel_ind = "_subscribe_cancel_indication"
- subscribe_refresh_req = "_subscribe_refresh_indication"
+ subscribe_refresh_ind = "_subscribe_refresh_indication"
query_req = "_query_request"
method_req = "_method_request"
@@ -67,7 +67,6 @@ class OpCode(object):
agent_heartbeat_ind = "_agent_heartbeat_indication"
query_rsp = "_query_response"
subscribe_rsp = "_subscribe_response"
- subscribe_refresh_rsp = "_subscribe_refresh_response"
data_ind = "_data_indication"
method_rsp = "_method_response"
diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py
index ec85aec0de..9227835b3f 100644
--- a/qpid/extras/qmf/src/py/qmf2/console.py
+++ b/qpid/extras/qmf/src/py/qmf2/console.py
@@ -406,16 +406,16 @@ class _SubscriptionMailbox(_AsyncMailbox):
return False
return True
- def resubscribe(self, duration):
+ def resubscribe(self):
agent = self.console.get_agent(self.agent_name)
if not agent:
- log.warning("resubscribed failed - unknown agent '%s'" %
- self.agent_name)
+ log.warning("resubscribed failed - unknown agent '%s'",
+ self.agent_name)
return False
try:
- trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending resubscribe to Agent %s", self.agent_name)
agent._send_resubscribe_req(self.get_address(),
- self.agent_subscription_id, duration)
+ self.agent_subscription_id)
except SendError, e:
log.error(str(e))
return False
@@ -425,8 +425,7 @@ class _SubscriptionMailbox(_AsyncMailbox):
"""
"""
opcode = msg.properties.get("qmf.opcode")
- if (opcode == OpCode.subscribe_rsp or
- opcode == OpCode.subscribe_refresh_rsp):
+ if (opcode == OpCode.subscribe_rsp):
error = msg.content.get("_error")
if error:
@@ -519,8 +518,6 @@ class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
agent, duration,
interval)
self.subscribe_pending = False
- self.resubscribe_pending = False
-
def subscribe(self, query, reply_timeout):
if super(_AsyncSubscriptionMailbox, self).subscribe(query):
@@ -529,34 +526,18 @@ class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
return True
return False
- def resubscribe(self, duration, reply_timeout):
- if super(_AsyncSubscriptionMailbox, self).resubscribe(duration):
- self.resubscribe_pending = True
- self.reset_timeout(reply_timeout)
- return True
- return False
-
def deliver(self, msg):
"""
"""
super(_AsyncSubscriptionMailbox, self).deliver(msg)
sp = self.fetch(0)
- if sp:
- # if the message was a reply to a subscribe or
- # re-subscribe, then we get here.
- if self.subscribe_pending:
- wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
- self.context, sp)
- else:
- wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
- self.context, sp)
-
- self.subscribe_pending = False
- self.resubscribe_pending = False
-
+ if sp and self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, sp)
self.console._work_q.put(wi)
self.console._work_q_put = True
+ self.subscribe_pending = False
+
if not sp.succeeded():
self.destroy()
@@ -565,11 +546,9 @@ class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
""" Either the subscription expired, or a request timedout.
"""
if self.subscribe_pending:
- wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
- self.context, None)
- elif self.resubscribe_pending:
- wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
- self.context, None)
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, None)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
self.destroy()
@@ -928,17 +907,14 @@ class Agent(object):
def _send_resubscribe_req(self, correlation_id,
- subscription_id,
- _lifetime=None):
+ subscription_id):
"""
"""
sr_map = {"_subscription_id":subscription_id}
- if _lifetime is not None:
- sr_map["_duration"] = _lifetime
msg = Message(id=QMF_APP_ID,
properties={"method":"request",
- "qmf.opcode":OpCode.subscribe_refresh_req},
+ "qmf.opcode":OpCode.subscribe_refresh_ind},
content=sr_map)
self._send_msg(msg, correlation_id)
@@ -1101,18 +1077,18 @@ class Console(Thread):
# for messages directly addressed to me
self._direct_recvr = self._session.receiver(str(self._address) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic,"
- " x-properties:"
+ " x-declare:"
" {type:direct}}}",
capacity=1)
trace.debug("my direct addr=%s" % self._direct_recvr.source)
self._direct_sender = self._session.sender(str(self._address.get_node()) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic,"
- " x-properties:"
+ " x-declare:"
" {type:direct}}}")
trace.debug("my direct sender=%s" % self._direct_sender.target)
@@ -1121,7 +1097,7 @@ class Console(Thread):
self._domain)
self._topic_recvr = self._session.receiver(str(default_addr) +
";{create:always,"
- " node-properties:{type:topic}}",
+ " node:{type:topic}}",
capacity=1)
trace.debug("default topic recv addr=%s" % self._topic_recvr.source)
@@ -1130,7 +1106,7 @@ class Console(Thread):
topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain)
self._topic_sender = self._session.sender(str(topic_addr) +
";{create:always,"
- " node-properties:{type:topic}}")
+ " node:{type:topic}}")
trace.debug("default topic send addr=%s" % self._topic_sender.target)
#
@@ -1410,10 +1386,10 @@ class Console(Thread):
return None
if isinstance(mbox, _AsyncSubscriptionMailbox):
- return mbox.resubscribe(_duration, _timeout)
+ return mbox.resubscribe()
else:
# synchronous - wait for reply
- if not mbox.resubscribe(_duration):
+ if not mbox.resubscribe():
# @todo ???? mbox.destroy()
return None
@@ -1641,11 +1617,11 @@ class Console(Thread):
"""
PRIVATE: Process a message received from an Agent
"""
- trace.debug( "Message received from Agent! [%s]" % msg )
+ trace.debug( "Message received from Agent! [%s]", msg )
opcode = msg.properties.get("qmf.opcode")
if not opcode:
- log.error("Ignoring unrecognized message '%s'" % msg)
+ log.error("Ignoring unrecognized message '%s'", msg)
return
version = 2 # @todo: fix me
@@ -1659,23 +1635,14 @@ class Console(Thread):
self._handle_agent_ind_msg( msg, cmap, version, _direct )
elif opcode == OpCode.agent_locate_rsp:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
- elif opcode == OpCode.query_rsp:
- self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.subscribe_rsp:
- self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.subscribe_refresh_rsp:
- self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.method_rsp:
+ elif msg.correlation_id:
self._handle_response_msg(msg, cmap, version, _direct)
elif opcode == OpCode.data_ind:
- if msg.correlation_id:
- self._handle_response_msg(msg, cmap, version, _direct)
- else:
- self._handle_indication_msg(msg, cmap, version, _direct)
+ self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
trace.debug("No-op msg received.")
else:
- log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
+ log.warning("Ignoring message with unrecognized 'opcode' value: '%s'", opcode)
def _handle_agent_ind_msg(self, msg, cmap, version, direct):
@@ -1742,12 +1709,12 @@ class Console(Thread):
"""
Process a received data-ind message.
"""
- trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("%s _handle_response_msg '%s'", self._name, str(msg))
mbox = self._get_mailbox(msg.correlation_id)
if not mbox:
- log.warning("Response msg received with unknown correlation_id"
- " msg='%s'" % str(msg))
+ log.warning("%s Response msg received with unknown correlation_id"
+ " msg='%s'", self._name, str(msg))
return
# wake up all waiters
@@ -1764,12 +1731,12 @@ class Console(Thread):
content_type = msg.properties.get("qmf.content")
if (content_type != ContentType.event or
not isinstance(msg.content, type([]))):
- log.warning("Bad event indication message received: '%s'" % msg)
+ log.warning("Bad event indication message received: '%s'", msg)
return
emap = msg.content[0]
if not isinstance(emap, type({})):
- trace.debug("Invalid event body in indication message: '%s'" % msg)
+ trace.debug("Invalid event body in indication message: '%s'", msg)
return
agent = None
@@ -1877,9 +1844,9 @@ class Console(Thread):
try:
agent._sender = self._session.sender(str(agent._address) +
";{create:always,"
- " node-properties:"
+ " node:"
" {type:topic,"
- " x-properties:"
+ " x-declare:"
" {type:direct}}}")
except:
log.warning("Unable to create sender for %s" % name)
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
index 1f73865f30..1f8a31dc22 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
@@ -506,7 +506,7 @@ class BaseTest(unittest.TestCase):
if r_count == 3:
rp = self.console.refresh_subscription(sp.get_subscription_id())
- self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+ self.assertTrue(rp)
wi = self.console.get_next_workitem(timeout=0)
@@ -695,7 +695,6 @@ class BaseTest(unittest.TestCase):
r_count = 0
i_count = 0
sp = None
- rp = None
while self.notifier.wait_for_work(4):
wi = self.console.get_next_workitem(timeout=0)
while wi is not None:
@@ -706,12 +705,6 @@ class BaseTest(unittest.TestCase):
self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
self.assertTrue(sp.succeeded())
self.assertTrue(sp.get_error() == None)
- elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE:
- self.assertTrue(wi.get_handle() == "my-handle")
- rp = wi.get_params()
- self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
- self.assertTrue(rp.succeeded())
- self.assertTrue(rp.get_error() == None)
else:
self.assertTrue(wi.get_type() ==
WorkItem.SUBSCRIBE_INDICATION)
@@ -745,7 +738,7 @@ class BaseTest(unittest.TestCase):
wi = self.console.get_next_workitem(timeout=0)
# expect 5 publish per subscription, more if refreshed
- self.assertTrue(sp is not None and rp is not None)
+ self.assertTrue(sp is not None)
self.assertTrue(i_count > 5)
self.console.destroy(10)
@@ -785,7 +778,6 @@ class BaseTest(unittest.TestCase):
r_count = 0
sp = None
- rp = None
while self.notifier.wait_for_work(4):
wi = self.console.get_next_workitem(timeout=0)
while wi is not None: