diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-01 16:44:56 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-01 16:44:56 +0000 |
| commit | 2329e7ce20218e25cfcd987487527cd345a89d35 (patch) | |
| tree | 6795d41fb422a4312b53103123086b6069f465af | |
| parent | 0f6041de53251d411e2f8032aa27d9d7f4b14244 (diff) | |
| download | qpid-python-2329e7ce20218e25cfcd987487527cd345a89d35.tar.gz | |
QPID-2261: object subscriptions, and tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@917584 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/agent.py | 234 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/common.py | 17 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/console.py | 49 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py | 334 |
4 files changed, 464 insertions, 170 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py index d884325071..6d4b3ea46c 100644 --- a/qpid/extras/qmf/src/py/qmf2/agent.py +++ b/qpid/extras/qmf/src/py/qmf2/agent.py @@ -83,37 +83,6 @@ class MethodCallParams(object): ## SUBSCRIPTIONS ##============================================================================== -class _ConsoleHandle(object): - """ - """ - def __init__(self, handle, reply_to): - self.console_handle = handle - self.reply_to = reply_to - -class SubscriptionParams(object): - """ - """ - def __init__(self, console_handle, query, interval, duration, user_id): - self._console_handle = console_handle - self._query = query - self._interval = interval - self._duration = duration - self._user_id = user_id - - def get_console_handle(self): - return self._console_handle - - def get_query(self): - return self._query - - def get_interval(self): - return self._interval - - def get_duration(self): - return self._duration - - def get_user_id(self): - return self._user_id class _SubscriptionState(object): """ @@ -128,6 +97,7 @@ class _SubscriptionState(object): now = datetime.datetime.utcnow() self.next_update = now # do an immediate update self.expiration = now + datetime.timedelta(seconds=duration) + self.last_update = None self.id = 0 def resubscribe(self, now, _duration=None): @@ -135,9 +105,9 @@ class _SubscriptionState(object): self.duration = _duration self.expiration = now + datetime.timedelta(seconds=self.duration) - def reset_interval(self, now): + def published(self, now): self.next_update = now + datetime.timedelta(seconds=self.interval) - + self.last_update = now ##============================================================================== @@ -193,6 +163,9 @@ class Agent(Thread): self._subscriptions = {} self._next_subscribe_event = None + # prevents multiple _wake_thread() calls + self._noop_pending = False + def destroy(self, timeout=None): """ @@ -264,18 +237,7 @@ class Agent(Thread): self._running = False if self.isAlive(): # kick my thread to wake it up - try: - msg = Message(id=QMF_APP_ID, - subject=self.name, - properties={ "method":"request", - "qmf.opcode":OpCode.noop}, - content={}) - - # TRACE - #logging.error("!!! sending wakeup to myself: %s" % msg) - self._direct_sender.send( msg, sync=True ) - except SendError, e: - logging.error(str(e)) + self._wake_thread() logging.debug("waiting for agent receiver thread to exit") self.join(timeout) if self.isAlive(): @@ -349,7 +311,6 @@ class Agent(Thread): """ Register an instance of a QmfAgentData object. """ - # @todo: need to update subscriptions # @todo: need to mark schema as "non-const" if not isinstance(data, QmfAgentData): raise TypeError("QmfAgentData instance expected") @@ -369,6 +330,18 @@ class Agent(Thread): self._described_data[sid][oid] = data else: self._undescribed_data[oid] = data + + # does the new object match any subscriptions? + now = datetime.datetime.utcnow() + for sid,sub in self._subscriptions.iteritems(): + if sub.query.evaluate(data): + # matched. Mark the subscription as needing to be + # serviced. The _publish() method will notice the new + # object and will publish it next time it runs. + sub.next_update = now + self._next_subscribe_event = None + # @todo: should we immediately publish? + finally: self._lock.release() @@ -387,7 +360,7 @@ class Agent(Thread): return data - def method_response(self, handle, _out_args=None, _error=None): + def method_response(self, handle, _out_args=None, _error=None): """ """ if not isinstance(handle, _MethodCallHandle): @@ -489,50 +462,37 @@ class Agent(Thread): logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) - - # # Monitor Subscriptions # - if (self._next_subscribe_event is None or - now >= self._next_subscribe_event): - - logging.debug("%s polling subscriptions..." % self.name) - self._next_subscribe_event = now + datetime.timedelta(seconds= + self._lock.acquire() + try: + now = datetime.datetime.utcnow() + if (self._next_subscribe_event is None or + now >= self._next_subscribe_event): + logging.debug("%s polling subscriptions..." % self.name) + self._next_subscribe_event = now + datetime.timedelta(seconds= self._max_duration) - self._lock.acquire() - try: - dead_ss = [] + dead_ss = {} for sid,ss in self._subscriptions.iteritems(): if now >= ss.expiration: - dead_ss.append(sid) + dead_ss[sid] = ss continue if now >= ss.next_update: - response = [] - objs = self._queryData(ss.query) - if objs: - for obj in objs: - response.append(obj.map_encode()) - logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id)) - self._send_query_response( ContentType.data, - ss.correlation_id, - ss.reply_to, - response) - ss.reset_interval(now) - + self._publish(ss) next_timeout = min(ss.expiration, ss.next_update) if next_timeout < self._next_subscribe_event: self._next_subscribe_event = next_timeout - for sid in dead_ss: + for sid,ss in dead_ss.iteritems(): del self._subscriptions[sid] - finally: - self._lock.release() + self._unpublish(ss) + finally: + self._lock.release() # # notify application of pending WorkItems # - if self._work_q_put and self._notifier: logging.debug("%s notifying application..." % self.name) # new stuff on work queue, kick the the application... @@ -545,10 +505,22 @@ class Agent(Thread): # # Sleep until messages arrive or something times out # - next_timeout = min(next_heartbeat, self._next_subscribe_event) - timeout = timedelta_to_secs(next_timeout - - datetime.datetime.utcnow()) - if timeout > 0.0: + now = datetime.datetime.utcnow() + next_timeout = next_heartbeat + self._lock.acquire() + try: + # the mailbox expire flag may be cleared by the + # app thread(s) in order to force an immediate publish + if self._next_subscribe_event is None: + next_timeout = now + elif self._next_subscribe_event < next_timeout: + next_timeout = self._next_subscribe_event + finally: + self._lock.release() + + timeout = timedelta_to_secs(next_timeout - now) + + if self._running and timeout > 0.0: logging.debug("%s sleeping %s seconds..." % (self.name, timeout)) try: @@ -557,7 +529,7 @@ class Agent(Thread): pass - + logging.debug("Shutting down Agent %s thread" % self.name) # # Private: @@ -667,6 +639,7 @@ class Agent(Thread): elif opcode == OpCode.subscribe_cancel_ind: self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.noop: + self._noop_pending = False logging.debug("No-op msg received.") else: logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" @@ -950,10 +923,13 @@ class Agent(Thread): self._lock.acquire() try: if sid in self._subscriptions: + dead_sub = self._subscriptions[sid] del self._subscriptions[sid] finally: self._lock.release() + self._unpublish(dead_sub) + def _queryPackagesReply(self, msg, query): """ @@ -1100,6 +1076,70 @@ class Agent(Thread): return data_objs + def _publish(self, sub): + """ Publish a subscription. + """ + response = [] + now = datetime.datetime.utcnow() + objs = self._queryData(sub.query) + if objs: + for obj in objs: + if sub.id not in obj._subscriptions: + # new to subscription - publish it + obj._subscriptions[sub.id] = sub + response.append(obj.map_encode()) + elif obj._dtime: + # obj._dtime is millisec since utc. Convert to datetime + utcdt = datetime.datetime.utcfromtimestamp(obj._dtime/1000.0) + if utcdt > sub.last_update: + response.append(obj.map_encode()) + else: + # obj._utime is millisec since utc. Convert to datetime + utcdt = datetime.datetime.utcfromtimestamp(obj._utime/1000.0) + if utcdt > sub.last_update: + response.append(obj.map_encode()) + + if response: + logging.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id)) + self._send_query_response( ContentType.data, + sub.correlation_id, + sub.reply_to, + response) + sub.published(now) + + def _unpublish(self, sub): + """ This subscription is about to be deleted, remove it from any + referencing objects. + """ + objs = self._queryData(sub.query) + if objs: + for obj in objs: + if sub.id in obj._subscriptions: + del obj._subscriptions[sub.id] + + + + def _wake_thread(self): + """ + Make the agent management thread loop wakeup from its next_receiver + sleep. + """ + self._lock.acquire() + try: + if not self._noop_pending: + logging.debug("Sending noop to wake up [%s]" % self._address) + msg = Message(id=QMF_APP_ID, + subject=self.name, + properties={"method":"indication", + "qmf.opcode":OpCode.noop}, + content={}) + try: + self._direct_sender.send( msg, sync=True ) + self._noop_pending = True + except SendError, e: + logging.error(str(e)) + finally: + self._lock.release() ##============================================================================== @@ -1164,6 +1204,7 @@ class QmfAgentData(QmfData): self._agent = agent self._validated = False self._modified = True + self._subscriptions = {} def destroy(self): self._dtime = long(time.time() * 1000) @@ -1175,7 +1216,8 @@ class QmfAgentData(QmfData): def set_value(self, _name, _value, _subType=None): super(QmfAgentData, self).set_value(_name, _value, _subType) - self._touch() + self._utime = long(time.time() * 1000) + self._touch(_name) # @todo: publish change def inc_value(self, name, delta=1): @@ -1191,8 +1233,12 @@ class QmfAgentData(QmfData): def dec_value(self, name, delta=1): """ subtract the delta from the property """ # @todo: need to take write-lock - logging.error(" TBD!!!") - self._touch() + val = self.get_value(name) + try: + val -= delta + except: + raise + self.set_value(name, val) def validate(self): """ @@ -1212,12 +1258,32 @@ class QmfAgentData(QmfData): raise Exception("Required property '%s' not present." % name) self._validated = True - def _touch(self): + def _touch(self, field=None): """ Mark this object as modified. Used to force a publish of this object if on subscription. """ - self._modified = True + now = datetime.datetime.utcnow() + publish = False + if field: + # if the named field is not continuous, mark any subscriptions as + # needing to be published. + sid = self.get_schema_class_id() + if sid: + self._agent._lock.acquire() + try: + schema = self._agent._schema.get(sid) + if schema: + prop = schema.get_property(field) + if prop and not prop.is_continuous(): + for sid,sub in self._subscriptions.iteritems(): + sub.next_update = now + publish = True + if publish: + self._agent._next_subscribe_event = None + self._agent._wake_thread() + finally: + self._agent._lock.release() diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py index 8070add806..548cebbf31 100644 --- a/qpid/extras/qmf/src/py/qmf2/common.py +++ b/qpid/extras/qmf/src/py/qmf2/common.py @@ -1447,12 +1447,17 @@ class SchemaProperty(_mapEncoder): map["unit"] = str, describes units used map["min"] = int, minimum allowed value map["max"] = int, maximun allowed value - map["maxlen"] = int, if string type, this is the maximum length in bytes + map["maxlen"] = int, if string type, this is the maximum length in bytes required to represent the longest instance of this string. map["desc"] = str, human-readable description of this argument map["reference"] = str, ??? map["parent_ref"] = bool, true if this property references an object in which this object is in a child-parent relationship. Default False + map["continuous"] = bool, true if the value potentially changes too fast to + be directly monitorable. Example: fast changing statistic or random + number. Subscriptions to objects containing continuous data will publish + only on an interval basis, rather than every time the data changes. Default + False. """ __hash__ = None _access_strings = ["RO","RW","RC"] @@ -1479,6 +1484,7 @@ class SchemaProperty(_mapEncoder): self._isParentRef = False self._dir = None self._default = None + self._is_continuous = False for key, value in kwargs.items(): if key == "access": @@ -1495,6 +1501,8 @@ class SchemaProperty(_mapEncoder): elif key == "desc" : self._desc = value elif key == "reference" : self._reference = value elif key == "parent_ref" : self._isParentRef = _to_bool(value) + elif key == "parent_ref" : self._isParentRef = _to_bool(value) + elif key == "continuous" : self._is_continuous = _to_bool(value) elif key == "dir": value = str(value).upper() if value not in self._dir_strings: @@ -1503,7 +1511,7 @@ class SchemaProperty(_mapEncoder): elif key == "default" : self._default = value # constructor - def _create(cls, type_code, kwargs={}): + def _create(cls, type_code, **kwargs): return cls(_type_code=type_code, kwargs=kwargs) create = classmethod(_create) @@ -1538,6 +1546,8 @@ class SchemaProperty(_mapEncoder): def get_default(self): return self._default + def is_continuous(self): return self._is_continuous + def map_encode(self): """ Return the map encoding of this schema. @@ -1556,6 +1566,7 @@ class SchemaProperty(_mapEncoder): _map["parent_ref"] = self._isParentRef if self._dir: _map["dir"] = self._dir if self._default: _map["default"] = self._default + if self._is_continuous: _map["continuous"] = self._is_continuous return _map def __repr__(self): @@ -1568,6 +1579,7 @@ class SchemaProperty(_mapEncoder): hasher.update(str(self._type)) hasher.update(str(self._isIndex)) hasher.update(str(self._isOptional)) + hasher.update(str(self._is_continuous)) if self._access: hasher.update(self._access) if self._unit: hasher.update(self._unit) if self._desc: hasher.update(self._desc) @@ -1575,7 +1587,6 @@ class SchemaProperty(_mapEncoder): if self._default: hasher.update(self._default) - class SchemaMethod(_mapEncoder): """ The SchemaMethod class describes the method's structure, and contains a diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py index afd20c3655..5c4a2eac85 100644 --- a/qpid/extras/qmf/src/py/qmf2/console.py +++ b/qpid/extras/qmf/src/py/qmf2/console.py @@ -1456,7 +1456,7 @@ class Console(Thread): logging.debug("Sending noop to wake up [%s]" % self._address) msg = Message(id=QMF_APP_ID, subject=self._name, - properties={"method":"request", + properties={"method":"indication", "qmf.opcode":OpCode.noop}, content={}) try: @@ -1510,30 +1510,31 @@ class Console(Thread): self._notifier.indication() _callback_thread = None - if self._operational: - # wait for a message to arrive, or an agent - # to expire, or a mailbox requrest to time out - now = datetime.datetime.utcnow() - next_expire = self._next_agent_expire - # the mailbox expire flag may be cleared by the - # app thread(s) - self._lock.acquire() - try: - if (self._next_mbox_expire and - self._next_mbox_expire < next_expire): - next_expire = self._next_mbox_expire - finally: - self._lock.release() + # wait for a message to arrive, or an agent + # to expire, or a mailbox requrest to time out + now = datetime.datetime.utcnow() + next_expire = self._next_agent_expire - if next_expire > now: - timeout = timedelta_to_secs(next_expire - now) - try: - logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) - xxx = self._session.next_receiver(timeout = timeout) - except Empty: - pass + self._lock.acquire() + try: + # the mailbox expire flag may be cleared by the + # app thread(s) to force an immedate mailbox scan + if self._next_mbox_expire is None: + next_expire = now + elif self._next_mbox_expire < next_expire: + next_expire = self._next_mbox_expire + finally: + self._lock.release() + timeout = timedelta_to_secs(next_expire - now) + + if self._operational and timeout > 0.0: + try: + logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) + self._session.next_receiver(timeout = timeout) + except Empty: + pass logging.debug("Shutting down Console thread") @@ -1742,8 +1743,8 @@ class Console(Thread): mbox = self._get_mailbox(msg.correlation_id) if not mbox: - logging.debug("Response msg received with unknown correlation_id" - " msg='%s'" % str(msg)) + logging.warning("Response msg received with unknown correlation_id" + " msg='%s'" % str(msg)) return # wake up all waiters diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py index 750952df46..1f73865f30 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py @@ -75,7 +75,13 @@ class _agentApp(Thread): _object_id_names=["key"] ) _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # note: count1 is continuous, count2 is not + count1_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32, + continuous=True) + _schema.add_property( "count1", count1_prop) + count2_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32, + continuous=False) _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32)) self.agent.register_object_class(_schema) @@ -224,7 +230,7 @@ class BaseTest(unittest.TestCase): # create console # find all agents # subscribe to changes to any object in package1/class1 - # should succeed + # should succeed - verify 1 publish self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) @@ -288,10 +294,10 @@ class BaseTest(unittest.TestCase): wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription - self.assertTrue(r_count == 5 * len(subscriptions)) + # expect 1 publish per subscription + self.assertTrue(r_count == 5) for ii in range(len(subscriptions)): - self.assertTrue(subscriptions[ii][1] == 5) + self.assertTrue(subscriptions[ii][1] == 1) self.console.destroy(10) @@ -349,21 +355,17 @@ class BaseTest(unittest.TestCase): self.assertTrue(len(reply) == 1) self.assertTrue(isinstance(reply[0], QmfData)) self.assertTrue(reply[0].get_object_id() == "undesc-2") - # print("!!! get_params() = %s" % wi.get_params()) self.assertTrue(wi.get_handle() < len(subscriptions)) subscriptions[wi.get_handle()][1] += 1 - # self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) - # self.assertTrue(reply.succeeded()) - # self.assertTrue(reply.get_argument("cookie") == - # wi.get_handle()) + self.console.release_workitem(wi) wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription - self.assertTrue(r_count == 5 * len(subscriptions)) - #for ii in range(len(subscriptions)): - # self.assertTrue(subscriptions[ii][1] == 5) + # expect 1 publish per subscription + self.assertTrue(r_count == 5) + for ii in range(len(subscriptions)): + self.assertTrue(subscriptions[ii][1] == 1) self.console.destroy(10) @@ -426,18 +428,15 @@ class BaseTest(unittest.TestCase): self.assertTrue(sid.get_class_name() == "class1") self.assertTrue(wi.get_handle() < len(subscriptions)) subscriptions[wi.get_handle()][1] += 1 - # self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) - # self.assertTrue(reply.succeeded()) - # self.assertTrue(reply.get_argument("cookie") == - # wi.get_handle()) + self.console.release_workitem(wi) wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription - self.assertTrue(r_count == 5 * len(subscriptions)) - #for ii in range(len(subscriptions)): - # self.assertTrue(subscriptions[ii][1] == 5) + # expect 1 publish per subscription + self.assertTrue(r_count == 5) + for ii in range(len(subscriptions)): + self.assertTrue(subscriptions[ii][1] == 1) self.console.destroy(10) @@ -459,9 +458,9 @@ class BaseTest(unittest.TestCase): self.conn.connect() self.console.add_connection(self.conn) - # query to match object "p2c1_key2" in schema package2/class1 - sid = SchemaClassId.create("package2", "class1") - query = QmfQuery.create_id_object("p2c1_key2", sid) + # query to match object "p1c1_key2" in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + query = QmfQuery.create_id_object("p1c1_key2", sid) agent_app = self.agents[0] aname = agent_app.agent.get_name() @@ -489,13 +488,20 @@ class BaseTest(unittest.TestCase): self.assertTrue(isinstance(reply, type([]))) self.assertTrue(len(reply) == 1) self.assertTrue(isinstance(reply[0], QmfData)) - self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + self.assertTrue(reply[0].get_object_id() == "p1c1_key2") sid = reply[0].get_schema_class_id() self.assertTrue(isinstance(sid, SchemaClassId)) - self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_package_name() == "package1") self.assertTrue(sid.get_class_name() == "class1") self.assertTrue(wi.get_handle() == "my-handle") + # count1 is continuous, touching it will force a + # publish on the interval + self.assertTrue(sid is not None) + test_obj = agent_app.agent.get_object("p1c1_key2", sid) + self.assertTrue(test_obj is not None) + test_obj.set_value("count1", r_count) + self.console.release_workitem(wi) if r_count == 3: @@ -504,11 +510,8 @@ class BaseTest(unittest.TestCase): wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription + # expect 5 publish per subscription, more if refreshed self.assertTrue(r_count > 5) - # print("!!! total r_count=%d" % r_count) - #for ii in range(len(subscriptions)): - # self.assertTrue(subscriptions[ii][1] == 5) self.console.destroy(10) @@ -530,9 +533,9 @@ class BaseTest(unittest.TestCase): self.conn.connect() self.console.add_connection(self.conn) - # query to match object "p2c1_key2" in schema package2/class1 - sid = SchemaClassId.create("package2", "class1") - query = QmfQuery.create_id_object("p2c1_key2", sid) + # query to match object "p1c1_key2" in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + query = QmfQuery.create_id_object("p1c1_key2", sid) agent_app = self.agents[0] aname = agent_app.agent.get_name() @@ -560,13 +563,20 @@ class BaseTest(unittest.TestCase): self.assertTrue(isinstance(reply, type([]))) self.assertTrue(len(reply) == 1) self.assertTrue(isinstance(reply[0], QmfData)) - self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + self.assertTrue(reply[0].get_object_id() == "p1c1_key2") sid = reply[0].get_schema_class_id() self.assertTrue(isinstance(sid, SchemaClassId)) - self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_package_name() == "package1") self.assertTrue(sid.get_class_name() == "class1") self.assertTrue(wi.get_handle() == "my-handle") + # count1 is continuous, touching it will force a + # publish on the interval + self.assertTrue(sid is not None) + test_obj = agent_app.agent.get_object("p1c1_key2", sid) + self.assertTrue(test_obj is not None) + test_obj.set_value("count1", r_count) + self.console.release_workitem(wi) if r_count == 3: @@ -574,10 +584,8 @@ class BaseTest(unittest.TestCase): wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription full duration - self.assertTrue(r_count < 5) - #for ii in range(len(subscriptions)): - # self.assertTrue(subscriptions[ii][1] == 5) + # expect only 3 publish received before cancel + self.assertTrue(r_count == 3) self.console.destroy(10) @@ -645,8 +653,8 @@ class BaseTest(unittest.TestCase): wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription - self.assertTrue(r_count == 6) + # one response + one publish = 2 + self.assertTrue(r_count == 2) self.console.destroy(10) @@ -665,9 +673,9 @@ class BaseTest(unittest.TestCase): self.conn.connect() self.console.add_connection(self.conn) - # query to match object "p2c1_key2" in schema package2/class1 - sid = SchemaClassId.create("package2", "class1") - query = QmfQuery.create_id_object("p2c1_key2", sid) + # query to match object "p1c1_key2" in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + query = QmfQuery.create_id_object("p1c1_key2", sid) agent_app = self.agents[0] aname = agent_app.agent.get_name() @@ -685,6 +693,7 @@ class BaseTest(unittest.TestCase): # refresh after three subscribe indications, count all # indications to verify refresh worked r_count = 0 + i_count = 0 sp = None rp = None while self.notifier.wait_for_work(4): @@ -706,20 +715,28 @@ class BaseTest(unittest.TestCase): else: self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + i_count += 1 # sp better be set up by now! self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) reply = wi.get_params() self.assertTrue(isinstance(reply, type([]))) self.assertTrue(len(reply) == 1) self.assertTrue(isinstance(reply[0], QmfData)) - self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + self.assertTrue(reply[0].get_object_id() == "p1c1_key2") sid = reply[0].get_schema_class_id() self.assertTrue(isinstance(sid, SchemaClassId)) - self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_package_name() == "package1") self.assertTrue(sid.get_class_name() == "class1") self.assertTrue(wi.get_handle() == "my-handle") - if r_count == 4: # + 1 for subscribe reply + # count1 is continuous, touching it will force a + # publish on the interval + self.assertTrue(sid is not None) + test_obj = agent_app.agent.get_object("p1c1_key2", sid) + self.assertTrue(test_obj is not None) + test_obj.set_value("count1", r_count) + + if r_count == 4: # 3 data + 1 subscribe reply rp = self.console.refresh_subscription(sp.get_subscription_id()) self.assertTrue(rp) @@ -727,8 +744,9 @@ class BaseTest(unittest.TestCase): wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 5 publish per subscription, + 2 replys - self.assertTrue(r_count > 7) + # expect 5 publish per subscription, more if refreshed + self.assertTrue(sp is not None and rp is not None) + self.assertTrue(i_count > 5) self.console.destroy(10) @@ -748,9 +766,9 @@ class BaseTest(unittest.TestCase): self.conn.connect() self.console.add_connection(self.conn) - # query to match object "p2c1_key2" in schema package2/class1 - sid = SchemaClassId.create("package2", "class1") - query = QmfQuery.create_id_object("p2c1_key2", sid) + # query to match object "p1c1_key2" in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + query = QmfQuery.create_id_object("p1c1_key2", sid) agent_app = self.agents[0] aname = agent_app.agent.get_name() @@ -765,8 +783,6 @@ class BaseTest(unittest.TestCase): _blocking=False) self.assertTrue(rc) - # refresh after three subscribe indications, count all - # indications to verify refresh worked r_count = 0 sp = None rp = None @@ -789,20 +805,220 @@ class BaseTest(unittest.TestCase): self.assertTrue(isinstance(reply, type([]))) self.assertTrue(len(reply) == 1) self.assertTrue(isinstance(reply[0], QmfData)) - self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + self.assertTrue(reply[0].get_object_id() == "p1c1_key2") sid = reply[0].get_schema_class_id() self.assertTrue(isinstance(sid, SchemaClassId)) - self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_package_name() == "package1") self.assertTrue(sid.get_class_name() == "class1") self.assertTrue(wi.get_handle() == "my-handle") + # count1 is continuous, touching it will force a + # publish on the interval + self.assertTrue(sid is not None) + test_obj = agent_app.agent.get_object("p1c1_key2", sid) + self.assertTrue(test_obj is not None) + test_obj.set_value("count1", r_count) + + if r_count == 3: self.console.cancel_subscription(sp.get_subscription_id()) self.console.release_workitem(wi) wi = self.console.get_next_workitem(timeout=0) - # for now, I expect 1 subscribe reply and 1 data_indication - self.assertTrue(r_count == 2) + # expect cancel after 3 replies + self.assertTrue(r_count == 3) + + self.console.destroy(10) + + + + + def test_sync_periodic_publish_continuous(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed - verify 1 publish + # Change continuous property on each publish, + # should only see 1 publish per interval + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match all objects in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + _target_params=t_params) + # find an agent + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + sp = self.console.create_subscription(agent, + query, + "some-handle") + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + self.assertTrue(sp.get_duration() == 10) + self.assertTrue(sp.get_publish_interval() == 2) + + # now wait for the (2 * interval) and count the updates + r_count = 0 + sid = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + self.assertTrue(wi.get_handle() == "some-handle") + if r_count == 1: + # first indication - returns all matching objects + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 2) + for obj in reply: + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == "p1c1_key2" or + obj.get_object_id() == "p1c1_key1") + sid = obj.get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package1") + self.assertTrue(sid.get_class_name() == "class1") + + else: + # verify publish of modified object only! + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + obj = reply[0] + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == "p1c1_key2") + self.assertTrue(obj.get_value("count1") == r_count - 1) + # fail test if we receive more than expected + self.assertTrue(r_count < 10) + + + # now update one of the objects! + self.assertTrue(sid is not None) + test_obj = agent_app.agent.get_object("p1c1_key2", sid) + self.assertTrue(test_obj is not None) + test_obj.set_value("count1", r_count) + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # expect at most 1 publish per interval seen + self.assertTrue(r_count < 10) + + self.console.destroy(10) + + + + + def test_sync_periodic_publish_noncontinuous(self): + # create console, find agent + # subscribe to changes to any object in package1/class1 + # should succeed - verify 1 publish + # Change noncontinuous property on each publish, + # should only see 1 publish per each update + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match all objects in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + _target_params=t_params) + # find an agent + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + sp = self.console.create_subscription(agent, + query, + "some-handle") + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + self.assertTrue(sp.get_duration() == 10) + self.assertTrue(sp.get_publish_interval() == 2) + + # now wait for the (2 * interval) and count the updates + r_count = 0 + sid = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + self.assertTrue(wi.get_handle() == "some-handle") + if r_count == 1: + # first indication - returns all matching objects + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 2) + for obj in reply: + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == "p1c1_key2" or + obj.get_object_id() == "p1c1_key1") + sid = obj.get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package1") + self.assertTrue(sid.get_class_name() == "class1") + + else: + # verify publish of modified object only! + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + obj = reply[0] + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == "p1c1_key2") + self.assertTrue(obj.get_value("count2") == r_count - 1) + # fail test if we receive more than expected + self.assertTrue(r_count < 30) + + + # now update the noncontinuous field of one of the objects! + if r_count < 20: + self.assertTrue(sid is not None) + test_obj = agent_app.agent.get_object("p1c1_key2", sid) + self.assertTrue(test_obj is not None) + test_obj.set_value("count2", r_count) + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # expect at least 1 publish per update + self.assertTrue(r_count > 10) self.console.destroy(10) |
