diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:21 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:21 +0000 |
commit | 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (patch) | |
tree | 197c810e5f5bce17b1233a7cb8d7b50c0bcd25e2 /TAO/orbsvcs/tests/Notify/Ordering | |
parent | 6b846cf03c0bcbd8c276cb0af61a181e5f98eaae (diff) | |
download | ATCD-3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c.tar.gz |
Repo restructuring
Diffstat (limited to 'TAO/orbsvcs/tests/Notify/Ordering')
15 files changed, 1934 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Makefile.am b/TAO/orbsvcs/tests/Notify/Ordering/Makefile.am new file mode 100644 index 00000000000..dfe5e1ea684 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Makefile.am @@ -0,0 +1,254 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ../bin/mwc.pl -type automake -noreldefs TAO.mwc + +ACE_BUILDDIR = $(top_builddir)/.. +ACE_ROOT = $(top_srcdir)/.. +TAO_BUILDDIR = $(top_builddir) +TAO_IDL = ACE_ROOT=$(ACE_ROOT) TAO_ROOT=$(TAO_ROOT) $(TAO_BUILDDIR)/TAO_IDL/tao_idl +TAO_IDL_DEP = $(TAO_BUILDDIR)/TAO_IDL/tao_idl +TAO_IDLFLAGS = -Ge 1 -Wb,pre_include=ace/pre.h -Wb,post_include=ace/post.h -I$(TAO_ROOT) -I$(srcdir) -g $(ACE_BUILDDIR)/apps/gperf/src/gperf +TAO_ROOT = $(top_srcdir) + +noinst_PROGRAMS = + +## Makefile.Ordering_Idl.am + +BUILT_SOURCES = \ + goC.cpp \ + goC.h \ + goC.inl \ + goS.cpp \ + goS.h \ + goS.inl + +CLEANFILES = \ + go-stamp \ + goC.cpp \ + goC.h \ + goC.inl \ + goS.cpp \ + goS.h \ + goS.inl + +goC.cpp goC.h goC.inl goS.cpp goS.h goS.inl: go-stamp + +go-stamp: $(srcdir)/go.idl $(TAO_IDL_DEP) + $(TAO_IDL) $(TAO_IDLFLAGS) -Sa -St $(srcdir)/go.idl + @touch $@ + + +noinst_HEADERS = \ + go.idl + +## Makefile.Ordering_Ntf_Seq_Cons.am + +if BUILD_CORBA_MESSAGING +if !BUILD_MINIMUM_CORBA + +noinst_PROGRAMS += Sequence_Consumer + +Sequence_Consumer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -I$(TAO_ROOT)/orbsvcs/tests/Notify/lib \ + -DTAO_HAS_TYPED_EVENT_CHANNEL + +Sequence_Consumer_SOURCES = \ + Notify_Sequence_Push_Consumer.cpp \ + Sequence_Consumer.cpp \ + goC.cpp \ + Notify_Sequence_Push_Consumer.h + +Sequence_Consumer_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/tests/Notify/lib/libTAO_NotifyTests.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_IFR_Client.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicInterface.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicAny.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_ETCL.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_MINIMUM_CORBA +endif BUILD_CORBA_MESSAGING + +## Makefile.Ordering_Ntf_Seq_Supp.am + +if BUILD_CORBA_MESSAGING +if !BUILD_MINIMUM_CORBA + +noinst_PROGRAMS += Sequence_Supplier + +Sequence_Supplier_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -I$(TAO_ROOT)/orbsvcs/tests/Notify/lib \ + -DTAO_HAS_TYPED_EVENT_CHANNEL + +Sequence_Supplier_SOURCES = \ + Sequence_Supplier.cpp \ + goC.cpp \ + goS.cpp \ + Notify_Sequence_Push_Consumer.h \ + Notify_Structured_Push_Consumer.h + +Sequence_Supplier_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/tests/Notify/lib/libTAO_NotifyTests.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_IFR_Client.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicInterface.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicAny.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_ETCL.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_MINIMUM_CORBA +endif BUILD_CORBA_MESSAGING + +## Makefile.Ordering_Ntf_Struct_Cons.am + +if BUILD_CORBA_MESSAGING +if !BUILD_MINIMUM_CORBA + +noinst_PROGRAMS += Structured_Consumer + +Structured_Consumer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -I$(TAO_ROOT)/orbsvcs/tests/Notify/lib \ + -DTAO_HAS_TYPED_EVENT_CHANNEL + +Structured_Consumer_SOURCES = \ + Notify_Structured_Push_Consumer.cpp \ + Structured_Consumer.cpp \ + goC.cpp \ + Notify_Structured_Push_Consumer.h + +Structured_Consumer_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/tests/Notify/lib/libTAO_NotifyTests.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_IFR_Client.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicInterface.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicAny.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_ETCL.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_MINIMUM_CORBA +endif BUILD_CORBA_MESSAGING + +## Makefile.Ordering_Ntf_Struct_Supp.am + +if BUILD_CORBA_MESSAGING +if !BUILD_MINIMUM_CORBA + +noinst_PROGRAMS += Structured_Supplier + +Structured_Supplier_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(TAO_ROOT) \ + -I$(TAO_BUILDDIR) \ + -I$(TAO_ROOT)/orbsvcs \ + -I$(TAO_BUILDDIR)/orbsvcs \ + -I$(TAO_ROOT)/orbsvcs/tests/Notify/lib \ + -DTAO_HAS_TYPED_EVENT_CHANNEL + +Structured_Supplier_SOURCES = \ + Structured_Supplier.cpp \ + goC.cpp \ + goS.cpp \ + Notify_Sequence_Push_Consumer.h \ + Notify_Structured_Push_Consumer.h + +Structured_Supplier_LDADD = \ + $(TAO_BUILDDIR)/orbsvcs/tests/Notify/lib/libTAO_NotifyTests.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_IFR_Client.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicInterface.la \ + $(TAO_BUILDDIR)/tao/libTAO_Messaging.la \ + $(TAO_BUILDDIR)/tao/libTAO_PI.la \ + $(TAO_BUILDDIR)/tao/libTAO_CodecFactory.la \ + $(TAO_BUILDDIR)/tao/libTAO_Valuetype.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNaming.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Serv.la \ + $(TAO_BUILDDIR)/tao/libTAO_DynamicAny.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_ETCL.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_Svc_Utils.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification_Skel.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent_Skel.la \ + $(TAO_BUILDDIR)/tao/libTAO_PortableServer.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosNotification.la \ + $(TAO_BUILDDIR)/orbsvcs/orbsvcs/libTAO_CosEvent.la \ + $(TAO_BUILDDIR)/tao/libTAO_AnyTypeCode.la \ + $(TAO_BUILDDIR)/tao/libTAO.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_MINIMUM_CORBA +endif BUILD_CORBA_MESSAGING + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Notify_Sequence_Push_Consumer.cpp b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Sequence_Push_Consumer.cpp new file mode 100644 index 00000000000..d8cb3fe8a76 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Sequence_Push_Consumer.cpp @@ -0,0 +1,158 @@ +// $Id$ + +#include "Notify_Sequence_Push_Consumer.h" +#include "Notify_Test_Client.h" +#include "common.h" +#include "orbsvcs/TimeBaseC.h" +#include "tao/debug.h" +#include "ace/OS_NS_unistd.h" + +const int BATCH_SIZE = 5; + +Notify_Sequence_Push_Consumer::Notify_Sequence_Push_Consumer ( + const char* name, + CORBA::Short policy, + bool use_ordering, + int expected, + Notify_Test_Client& client) + : name_ (name), + order_policy_ (policy), + use_ordering_ (use_ordering), + expected_ (expected), + count_ (0), + previous_first_ (0), + client_ (client) +{ + this->client_.consumer_start (this); +} + + +void +Notify_Sequence_Push_Consumer::_connect ( + CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + CosNotifyComm::SequencePushConsumer_var objref = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + CosNotifyChannelAdmin::ProxySupplier_var proxysupplier = + consumer_admin->obtain_notification_push_supplier ( + CosNotifyChannelAdmin::SEQUENCE_EVENT, + proxy_id_ + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->proxy_ = + CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow ( + proxysupplier.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + CosNotification::QoSProperties properties (3); + properties.length (3); + properties[0].name = CORBA::string_dup (CosNotification::MaximumBatchSize); + properties[0].value <<= (CORBA::Long) BATCH_SIZE; + properties[1].name = CORBA::string_dup (CosNotification::PacingInterval); + properties[1].value <<= (TimeBase::TimeT) (1000 * 10000); // 1 secs + if (use_ordering_) + { + properties[2].name = CORBA::string_dup (CosNotification::OrderPolicy); + properties[2].value <<= this->order_policy_; + } + else + { + properties.length(2); + } + + this->proxy_->set_qos (properties); + this->proxy_->connect_sequence_push_consumer (objref.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // give ownership to POA + this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; +} + + +void +Notify_Sequence_Push_Consumer::push_structured_events ( + const CosNotification::EventBatch& events + ACE_ENV_ARG_DECL_NOT_USED /*ACE_ENV_SINGLE_ARG_PARAMETER*/) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (count_ == 0) + { + // Sleep long enough to force the channel to back up, otherwise + // there will be no ordering. + ACE_OS::sleep(2); + } + + ACE_ASSERT(events.length() == static_cast<CORBA::ULong>(BATCH_SIZE)); + + count_ += events.length(); + + if (this->count_ > this->expected_) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: too many events received.\n"))); + } + + if (this->count_ >= this->expected_) + { + this->client_.consumer_done (this); + } + + ACE_ASSERT(events[0].header.variable_header.length() == 3); + ACE_ASSERT(ACE_OS::strcmp(events[0].header.variable_header[0].name.in(), "id") == 0); + CORBA::Long first_id = 0; + events[0].header.variable_header[0].value >>= first_id; + CORBA::Long last_id = 0; + events[events.length() - 1].header.variable_header[0].value >>= last_id; + + ACE_DEBUG((LM_DEBUG, "\n Received id %d-%d\n", first_id, last_id)); + + int events_length = static_cast<int>(events.length()); + + CORBA::Long previous_id = first_id; + + if (count_ > events_length) // Ignore the very first batch + { + // First check that the sequences are ordered correctly + for (CORBA::ULong idx = 1; idx < events.length(); ++idx) + { + CORBA::Long id = 0; + + events[idx].header.variable_header[0].value >>= id; + CORBA::Long expected_id = previous_id + 1; + if (order_policy_ == CosNotification::PriorityOrder + || order_policy_ == CosNotification::DeadlineOrder) + { + expected_id = previous_id - 1; + } + if (id != expected_id) + { + ACE_ERROR((LM_ERROR, "Error: Expected:%d Received:%d\n", expected_id, id)); + return; + } + previous_id = id; + } + + // Next check that the first id in the sequence is ordered + // relative to the previously retrieved sequence. + if (previous_first_ != 0) + { + CORBA::Long expected_id = previous_first_ + BATCH_SIZE; + if (order_policy_ == CosNotification::PriorityOrder + || order_policy_ == CosNotification::DeadlineOrder) + { + expected_id = previous_first_ - BATCH_SIZE; + } + if (first_id != expected_id) + { + ACE_ERROR((LM_ERROR, "Error: Expected:%d Received:%d\n", expected_id, first_id)); + return; + } + } + previous_first_ = first_id; + } +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Notify_Sequence_Push_Consumer.h b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Sequence_Push_Consumer.h new file mode 100644 index 00000000000..a7fe36f0288 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Sequence_Push_Consumer.h @@ -0,0 +1,52 @@ +/* -*- C++ -*- */ +// $Id$ +// ========================================================================== +// +// = LIBRARY +// TAO/orbsvcs/tests/Notify/Ordering +// +// = FILENAME +// Notify_Sequence_Push_Consumer.h +// +// = DESCRIPTION +// A sequence push consumer implementation. +// +// = AUTHOR +// Chad Elliott <elliott_c@ociweb.com> +// +// ========================================================================== +#ifndef TAO_NOTIFY_SEQUENCE_PUSH_CONSUMER_H +#define TAO_NOTIFY_SEQUENCE_PUSH_CONSUMER_H + +#include "Notify_SequencePushConsumer.h" + +class Notify_Test_Client; + +class Notify_Sequence_Push_Consumer: public TAO_Notify_Tests_SequencePushConsumer +{ +public: + Notify_Sequence_Push_Consumer (const char* name, + CORBA::Short policy, + bool use_ordering, + int expected_count, + Notify_Test_Client& client); + + void _connect (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +protected: + void push_structured_events (const CosNotification::EventBatch& + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + + ACE_CString name_; + CORBA::Short order_policy_; + bool use_ordering_; + int expected_; + int count_; + int previous_first_; + Notify_Test_Client& client_; +}; + +#endif /* TAO_NOTIFY_SEQUENCE_PUSH_CONSUMER_H */ diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.cpp b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.cpp new file mode 100644 index 00000000000..d8e9f5a6552 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.cpp @@ -0,0 +1,131 @@ +// $Id$ + +#include "Notify_Structured_Push_Consumer.h" +#include "Notify_Test_Client.h" +#include "common.h" + +#include "orbsvcs/TimeBaseC.h" + +#include "tao/debug.h" + +#include "ace/OS_NS_unistd.h" + +Notify_Structured_Push_Consumer::Notify_Structured_Push_Consumer ( + const char* name, + CORBA::Short policy, + bool use_ordering, + int expected, + Notify_Test_Client& client) + : name_ (name), + order_policy_ (policy), + use_ordering_ (use_ordering), + expected_ (expected), + count_ (0), + first_(0), + client_ (client) +{ + this->client_.consumer_start (this); +} + + +void +Notify_Structured_Push_Consumer::_connect ( + CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + CosNotifyComm::StructuredPushConsumer_var objref = + this->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + CosNotifyChannelAdmin::ProxySupplier_var proxysupplier = + consumer_admin->obtain_notification_push_supplier ( + CosNotifyChannelAdmin::STRUCTURED_EVENT, + proxy_id_ + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->proxy_ = + CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow ( + proxysupplier.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + if (use_ordering_) + { + CosNotification::QoSProperties properties (1); + properties.length (1); + properties[0].name = CORBA::string_dup (CosNotification::OrderPolicy); + properties[0].value <<= this->order_policy_; + + this->proxy_->set_qos (properties); + } + + this->proxy_->connect_structured_push_consumer (objref.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // give ownership to POA + this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; +} + + +void +Notify_Structured_Push_Consumer::push_structured_event ( + const CosNotification::StructuredEvent& event + ACE_ENV_ARG_DECL_NOT_USED /*ACE_ENV_SINGLE_ARG_PARAMETER*/) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_DEBUG((LM_DEBUG, "-")); + if (count_ == 0) + { + // Sleep long enough to force the channel to back up, otherwise + // there will be no ordering. + ACE_OS::sleep(2); + } + + ++count_; + + if (this->count_ > this->expected_) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: too many events received.\n"))); + } + + if (this->count_ >= this->expected_) + { + this->client_.consumer_done (this); + } + + CORBA::Long id = 0; + + ACE_ASSERT(event.header.variable_header.length() == 3); + ACE_ASSERT(ACE_OS::strcmp(event.header.variable_header[0].name.in(), "id") == 0); + event.header.variable_header[0].value >>= id; + + // The first event won't necessarilly be in order, because we hadn't yet forced + // the channel to queue events. + if (count_ > 1) + { + if (order_policy_ == CosNotification::PriorityOrder + || order_policy_ == CosNotification::DeadlineOrder) + { + int eid = expected_ - count_ + 1; + if (eid <= first_) + --eid; + + if (id != eid) + ACE_ERROR((LM_ERROR, "\nError: ")); + ACE_DEBUG((LM_DEBUG, "Expected id:%d Received id:%d\n", eid, id)); + } + else + { + if (id != count_ - 1) + ACE_ERROR((LM_ERROR, "\nError: Expected id:%d Received id:%d\n", count_ - 1, id)); + } + } + else + { + ACE_DEBUG((LM_DEBUG, "Ignoring first event. id=%d\n", id)); + first_ = id; + } +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.h b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.h new file mode 100644 index 00000000000..01b5b69c922 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Notify_Structured_Push_Consumer.h @@ -0,0 +1,53 @@ +/* -*- C++ -*- */ +// $Id$ +// ========================================================================== +// +// = LIBRARY +// TAO/orbsvcs/tests/Notify/Ordering +// +// = FILENAME +// Notify_Structured_Push_Consumer.h +// +// = DESCRIPTION +// A structured push consumer implementation. +// +// = AUTHOR +// Chad Elliott <elliott_c@ociweb.com> +// +// ========================================================================== +#ifndef TAO_NOTIFY_STRUCTURED_PUSH_CONSUMER_H +#define TAO_NOTIFY_STRUCTURED_PUSH_CONSUMER_H + +#include "Notify_StructuredPushConsumer.h" + +class Notify_Test_Client; + +class Notify_Structured_Push_Consumer: public TAO_Notify_Tests_StructuredPushConsumer +{ +public: + Notify_Structured_Push_Consumer ( + const char* name, + CORBA::Short policy, + bool use_ordering, + int expected, + Notify_Test_Client& client); + + void _connect (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +protected: + void push_structured_event (const CosNotification::StructuredEvent& + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + + ACE_CString name_; + CORBA::Short order_policy_; + bool use_ordering_; + int expected_; + int count_; + int first_; + Notify_Test_Client& client_; +}; + +#endif /* TAO_NOTIFY_STRUCTURED_PUSH_CONSUMER_H */ diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Ordering.mpc b/TAO/orbsvcs/tests/Notify/Ordering/Ordering.mpc new file mode 100644 index 00000000000..c57d14c1ddc --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Ordering.mpc @@ -0,0 +1,61 @@ +// -*- MPC -*- +// $Id$ + +project(*idl): taoidldefaults { + IDL_Files { + go.idl + } + custom_only = 1 +} + +project(*Ntf Struct Supp): notifytest { + exename = Structured_Supplier + + after += *idl + Source_Files { + Structured_Supplier.cpp + goS.cpp + goC.cpp + } + IDL_Files { + } +} + +project(*Ntf Struct Cons): notifytest { + exename = Structured_Consumer + + after += *idl + Source_Files { + goC.cpp + Notify_Structured_Push_Consumer.cpp + Structured_Consumer.cpp + } + IDL_Files { + } +} + +project(*Ntf Seq Supp): notifytest { + exename = Sequence_Supplier + + after += *idl + Source_Files { + Sequence_Supplier.cpp + goS.cpp + goC.cpp + } + IDL_Files { + } +} + +project(*Ntf Seq Cons): notifytest { + exename = Sequence_Consumer + + after += *idl + Source_Files { + goC.cpp + Notify_Sequence_Push_Consumer.cpp + Sequence_Consumer.cpp + } + IDL_Files { + } +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/README b/TAO/orbsvcs/tests/Notify/Ordering/README new file mode 100644 index 00000000000..9df80a17192 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/README @@ -0,0 +1,53 @@ +Notification Ordering Policy Test +================================ + +Description +----------- + +This test exercises the event discarding policies of the Notification +Service. Each of the implemented polices are tested (fifo, lifo and +priority) for both structured push consumers and sequence push consumers. + +A number of events are sent by the supplier and the consumer should receive +a lesser number of events due queue size overflow. Note that with the +sequence push consumer, it will actually receive more "events" than are sent +by the supplier. This is due to the fact that the supplier is sending a +sequence of events that is much larger than the maximum batch size (set by +the consumer) and therefore the notification service breaks the sequence +into smaller sets to honor the maximum batch size setting. + + +Usage +----- + +The test consists of a Structured_Supplier and Structured_Consumer as well +as a Sequence_Supplier and Sequence_Consumer. The usage for each as is +follows: + +$ Structured_Supplier -\? +usage: Structured_Supplier -o <iorfile> -e <# of events> + +$ Structured_Consumer -\? +usage: Structured_Consumer -k <ior> -e <expected events> + -d <fifo|priority|lifo> + +$ Sequence_Supplier -\? +usage: Sequence_Supplier -o <iorfile> -e <# of events> + +$ Sequence_Consumer -\? +usage: Sequence_Consumer -k <ior> -l <low expected events> + -h <high expected events> -d <fifo|priority|lifo> + + +To run this test, just run the run_test.pl perl script. It will run both +structured and sequence tests with each of the implemented discard policies. + + +Expected Results +---------------- +The consumers of each type expect to only receive a certain number of +events. If you see: + +ERROR: too many events received. + +then the test has failed. Otherwise, the test was ok. diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Sequence_Consumer.cpp b/TAO/orbsvcs/tests/Notify/Ordering/Sequence_Consumer.cpp new file mode 100644 index 00000000000..79f44545167 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Sequence_Consumer.cpp @@ -0,0 +1,185 @@ +// $Id$ +#include "Notify_Sequence_Push_Consumer.h" +#include "goC.h" + +#include "Notify_Test_Client.h" + +#include "orbsvcs/CosNotifyCommC.h" +#include "orbsvcs/CosNamingC.h" + +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_stat.h" + +static const char* ior = "file://supplier.ior"; +static CORBA::Short order_policy = CosNotification::FifoOrder; +static int expected = 30; // Must match the amount sent by supplier +static Notify_Sequence_Push_Consumer* consumer_1; +static bool use_ordering = false; + +class Consumer_Client : public Notify_Test_Client +{ +public: + virtual int parse_args (int argc, char* argv[]); +}; + + +int +Consumer_Client::parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "ok:e:d:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + ior = get_opts.optarg; + break; + + case 'e': + expected = ACE_OS::atoi (get_opts.optarg); + break; + + case 'o': + use_ordering = true; + break; + + case 'd': + { + const char* order = get_opts.optarg; + if (ACE_OS::strcmp (order, "fifo") == 0) + { + order_policy = CosNotification::FifoOrder; + } + else if (ACE_OS::strcmp (order, "priority") == 0) + { + order_policy = CosNotification::PriorityOrder; + } + else if (ACE_OS::strcmp (order, "deadline") == 0) + { + order_policy = CosNotification::DeadlineOrder; +#if !defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) + ACE_ERROR_RETURN ((LM_ERROR, + "This order policy requires timed message " + "blocks.\nPlease #define " + "ACE_HAS_TIMED_MESSAGE_BLOCKS in your " + "config.h\n"), -1); +#endif + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "Unknown ordering policy: %s\n", + order_policy), + -1); + } + break; + } + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <ior> " + "-e <expected events> " + "-d <fifo|priority|deadline> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + + +static CosNotifyChannelAdmin::ConsumerAdmin_ptr +create_consumeradmin (CosNotifyChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + CosNotifyChannelAdmin::AdminID adminid = 0; + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + ec->new_for_consumers (CosNotifyChannelAdmin::OR_OP, + adminid + ACE_ENV_ARG_PARAMETER); + + ACE_CHECK_RETURN (0); + + return CosNotifyChannelAdmin::ConsumerAdmin::_duplicate (admin.in ()); +} + + +static void +create_consumers (CosNotifyChannelAdmin::ConsumerAdmin_ptr admin, + Notify_Test_Client* client + ACE_ENV_ARG_DECL) +{ + ACE_NEW_THROW_EX (consumer_1, + Notify_Sequence_Push_Consumer ("consumer1", + order_policy, + use_ordering, + expected, + *client), + CORBA::NO_MEMORY ()); + + consumer_1->init (client->root_poa () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + consumer_1->_connect (admin ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +int main (int argc, char* argv[]) +{ + ACE_TRY_NEW_ENV + { + Consumer_Client client; + + int status = client.init (argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_UNUSED_ARG(status); + ACE_ASSERT(status == 0); + + CosNotifyChannelAdmin::EventChannel_var ec = + client.create_event_channel ("MyEventChannel", 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::ORB_ptr orb = client.orb (); + CORBA::Object_var object = + orb->string_to_object (ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig_var sig = sig::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ASSERT(! CORBA::is_nil (sig.in ())); + + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + create_consumeradmin (ec.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ASSERT(!CORBA::is_nil (admin.in ())); + create_consumers (admin.in (), &client ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig->go (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + client.ORB_run( ACE_ENV_SINGLE_ARG_PARAMETER ); + ACE_TRY_CHECK; + + ACE_DEBUG((LM_DEBUG, "Consumer done.\n")); + consumer_1->disconnect(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig->done (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + return 0; + } + ACE_CATCH (CORBA::Exception, e) + { + ACE_PRINT_EXCEPTION (e, "Error: "); + } + ACE_ENDTRY; + + return 1; +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Sequence_Supplier.cpp b/TAO/orbsvcs/tests/Notify/Ordering/Sequence_Supplier.cpp new file mode 100644 index 00000000000..c1d572de8c2 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Sequence_Supplier.cpp @@ -0,0 +1,284 @@ +// $Id$ +#include "Notify_SequencePushSupplier.h" +#include "goS.h" +#include "Notify_Test_Client.h" + +#include "orbsvcs/CosNotifyChannelAdminS.h" +#include "orbsvcs/CosNotifyCommC.h" +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/TimeBaseC.h" + +#include "tao/ORB_Core.h" + +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_strings.h" +#include "ace/Auto_Ptr.h" + +static TAO_Notify_Tests_SequencePushSupplier* supplier_1 = 0; +static CORBA::Short order_policy = CosNotification::FifoOrder; +static int num_events = 30; +static const char* ior_output_file = "supplier.ior"; +static const int BATCH_SIZE = 5; + +class sig_i : public POA_sig +{ +public: + sig_i(CORBA::ORB_ptr orb) + : orb_(orb) + , started_(false) + { + } + + void go (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + started_ = true; + } + + void done (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + started_ = false; + } + + void wait_for_startup() + { + while (! started_) { + ACE_Time_Value tv(0, 100 * 1000); // 100ms + orb_->run(tv); + } + } + + void wait_for_completion() + { + while (started_) { + ACE_Time_Value tv(0, 100 * 1000); // 100ms + orb_->run(tv); + } + } + +private: + CORBA::ORB_ptr orb_; + bool started_; +}; + +class Supplier_Client : public Notify_Test_Client +{ +public: + virtual int parse_args (int argc, char* argv[]); +}; + +int +Supplier_Client::parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:e:d:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'd': + { + const char* order = get_opts.optarg; + if (ACE_OS::strcasecmp (order, "fifo") == 0) + { + order_policy = CosNotification::FifoOrder; + } + else if (ACE_OS::strcasecmp (order, "priority") == 0) + { + order_policy = CosNotification::PriorityOrder; + } + else if (ACE_OS::strcasecmp (order, "deadline") == 0) + { + order_policy = CosNotification::DeadlineOrder; +#if !defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) + ACE_ERROR_RETURN ((LM_ERROR, + "This order policy requires timed message " + "blocks.\nPlease #define " + "ACE_HAS_TIMED_MESSAGE_BLOCKS in your " + "config.h\n"), -1); +#endif + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "Unknown order policy: %s\n", + order_policy), + -1); + } + break; + } + + case 'e': + num_events = ACE_OS::atoi (get_opts.optarg); + break; + + case 'o': + ior_output_file = get_opts.optarg; + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> -e <# of events> -d" + "\n", + argv [0]), + -1); + } + + // Indicates sucessful parsing of the command line + return 0; +} + +static CosNotifyChannelAdmin::SupplierAdmin_ptr +create_supplieradmin (CosNotifyChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + CosNotifyChannelAdmin::AdminID adminid = 0; + CosNotifyChannelAdmin::SupplierAdmin_var admin = + ec->new_for_suppliers (CosNotifyChannelAdmin::AND_OP, + adminid + ACE_ENV_ARG_PARAMETER); + + ACE_CHECK_RETURN (0); + + return CosNotifyChannelAdmin::SupplierAdmin::_duplicate (admin.in ()); +} + + +static void +SendBatch (int batch_id ACE_ENV_ARG_DECL) +{ + CosNotification::EventBatch events; + events.length(BATCH_SIZE); + for (CORBA::Long i = 0; i < BATCH_SIZE; ++i) + { + int id = batch_id * BATCH_SIZE + i; + + CosNotification::StructuredEvent event; + + event.header.fixed_header.event_type.domain_name = CORBA::string_dup ("a"); + event.header.fixed_header.event_type.type_name = CORBA::string_dup ("b"); + event.header.fixed_header.event_name = CORBA::string_dup ("c"); + + event.header.variable_header.length (3); + event.header.variable_header[0].name = + CORBA::string_dup ("id"); + event.header.variable_header[0].value <<= (CORBA::Long)id; + event.header.variable_header[1].name = + CORBA::string_dup (CosNotification::Priority); + event.header.variable_header[1].value <<= (CORBA::Short)(id); + event.header.variable_header[2].name = + CORBA::string_dup (CosNotification::Timeout); + event.header.variable_header[2].value <<= (TimeBase::TimeT) (num_events - id); + + events[i] = event; + } + supplier_1->send_events (events ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +static void +create_suppliers (CosNotifyChannelAdmin::SupplierAdmin_ptr admin, + PortableServer::POA_ptr poa + ACE_ENV_ARG_DECL) +{ + ACE_NEW_THROW_EX (supplier_1, + TAO_Notify_Tests_SequencePushSupplier (), + CORBA::NO_MEMORY ()); + + supplier_1->init (poa ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + supplier_1->connect (admin ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + + +// ****************************************************************** +// Main Section +// ****************************************************************** + +int main (int argc, char* argv[]) +{ + ACE_Auto_Ptr< sig_i > sig_impl; + ACE_TRY_NEW_ENV + { + Supplier_Client client; + int status = client.init (argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_UNUSED_ARG(status); + ACE_ASSERT(status == 0); + + CosNotifyChannelAdmin::EventChannel_var ec = + client.create_event_channel ("MyEventChannel", 0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNotification::QoSProperties qos (1); + qos.length (1); + qos[0].name = CORBA::string_dup (CosNotification::OrderPolicy); + qos[0].value <<= order_policy; + ec->set_qos (qos ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::ORB_ptr orb = client.orb (); + + sig_impl.reset( new sig_i( orb ) ); + sig_var sig = sig_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (sig.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for " + "writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + CosNotifyChannelAdmin::SupplierAdmin_var admin = + create_supplieradmin (ec.in () ACE_ENV_ARG_PARAMETER); + ACE_ASSERT(!CORBA::is_nil (admin.in ())); + create_suppliers (admin.in (), client.root_poa () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig_impl->wait_for_startup(); + + ACE_DEBUG((LM_DEBUG, "1 supplier sending %d events...\n", num_events)); + for (int i = 0; i < num_events / BATCH_SIZE; ++i) + { + ACE_DEBUG((LM_DEBUG, "+")); + SendBatch (i ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_DEBUG((LM_DEBUG, "\nSupplier sent %d events.\n", num_events)); + + sig_impl->wait_for_completion(); + + ACE_OS::unlink (ior_output_file); + + supplier_1->disconnect(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ec->destroy(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + return 0; + } + ACE_CATCH (CORBA::Exception, e) + { + ACE_PRINT_EXCEPTION (e, "Error: "); + } + ACE_ENDTRY; + + return 1; +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Structured_Consumer.cpp b/TAO/orbsvcs/tests/Notify/Ordering/Structured_Consumer.cpp new file mode 100644 index 00000000000..d86df34f06b --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Structured_Consumer.cpp @@ -0,0 +1,181 @@ +// $Id$ +#include "Notify_Structured_Push_Consumer.h" +#include "goC.h" +#include "Notify_Test_Client.h" + +#include "orbsvcs/CosNotifyCommC.h" +#include "orbsvcs/CosNamingC.h" + +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" + +static const char* ior = "file://supplier.ior"; +static CORBA::Short order_policy = CosNotification::FifoOrder; +static int expected = 30; +static Notify_Structured_Push_Consumer* consumer_1; +static bool use_ordering = false; + +class Consumer_Client : public Notify_Test_Client +{ +public: + virtual int parse_args (int argc, char* argv[]); +}; + +int +Consumer_Client::parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "ok:e:d:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + ior = get_opts.optarg; + break; + + case 'o': + use_ordering = true; + break; + + case 'e': + expected = ACE_OS::atoi (get_opts.optarg); + break; + + case 'd': + { + const char* order = get_opts.optarg; + if (ACE_OS::strcmp (order, "fifo") == 0) + { + order_policy = CosNotification::FifoOrder; + } + else if (ACE_OS::strcmp (order, "priority") == 0) + { + order_policy = CosNotification::PriorityOrder; + } + else if (ACE_OS::strcmp (order, "deadline") == 0) + { + order_policy = CosNotification::DeadlineOrder; +#if !defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) + ACE_ERROR_RETURN ((LM_ERROR, + "This order policy requires timed message" + "blocks.\nPlease #define " + "ACE_HAS_TIMED_MESSAGE_BLOCKS in your " + "config.h\n"), -1); +#endif + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "Unknown order policy: %s\n", + order_policy), + -1); + } + break; + } + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <ior> " + "-e <expected events> " + "-d <fifo|priority|deadline> " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +static CosNotifyChannelAdmin::ConsumerAdmin_ptr +create_consumeradmin (CosNotifyChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + CosNotifyChannelAdmin::AdminID adminid = 0; + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + ec->new_for_consumers (CosNotifyChannelAdmin::OR_OP, + adminid + ACE_ENV_ARG_PARAMETER); + + ACE_CHECK_RETURN (0); + + return CosNotifyChannelAdmin::ConsumerAdmin::_duplicate (admin.in ()); +} + +static void +create_consumers (CosNotifyChannelAdmin::ConsumerAdmin_ptr admin, + Notify_Test_Client* client + ACE_ENV_ARG_DECL) +{ + // startup the first consumer + ACE_NEW_THROW_EX (consumer_1, + Notify_Structured_Push_Consumer ("consumer1", + order_policy, + use_ordering, + expected, + *client), + CORBA::NO_MEMORY ()); + consumer_1->init (client->root_poa () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + consumer_1->_connect (admin ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +int main (int argc, char* argv[]) +{ + ACE_TRY_NEW_ENV + { + Consumer_Client client; + + int status = client.init (argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_UNUSED_ARG(status); + ACE_ASSERT(status == 0); + + CosNotifyChannelAdmin::EventChannel_var ec = + client.create_event_channel ("MyEventChannel", 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::ORB_ptr orb = client.orb (); + CORBA::Object_var object = + orb->string_to_object (ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig_var sig = sig::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ASSERT(! CORBA::is_nil (sig.in ())); + + CosNotifyChannelAdmin::ConsumerAdmin_var admin = + create_consumeradmin (ec.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_ASSERT(!CORBA::is_nil (admin.in ())); + + create_consumers (admin.in (), &client ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig->go (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + client.ORB_run( ACE_ENV_SINGLE_ARG_PARAMETER ); + ACE_TRY_CHECK; + + ACE_DEBUG((LM_DEBUG, "Consumer done.\n")); + consumer_1->disconnect(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig->done (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + return 0; + } + ACE_CATCH (CORBA::Exception, e) + { + ACE_PRINT_EXCEPTION (e, "Error: "); + } + ACE_ENDTRY; + + return 1; +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/Structured_Supplier.cpp b/TAO/orbsvcs/tests/Notify/Ordering/Structured_Supplier.cpp new file mode 100644 index 00000000000..bda45d89f6c --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/Structured_Supplier.cpp @@ -0,0 +1,281 @@ +// $Id$ +#include "Notify_StructuredPushSupplier.h" +#include "goS.h" + +#include "Notify_Test_Client.h" + +#include "orbsvcs/CosNotifyChannelAdminS.h" +#include "orbsvcs/CosNotifyCommC.h" +#include "orbsvcs/CosNamingC.h" +#include "orbsvcs/TimeBaseC.h" + +#include "tao/ORB_Core.h" + +#include "ace/Get_Opt.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Auto_Ptr.h" + +static TAO_Notify_Tests_StructuredPushSupplier* supplier_1 = 0; +static CORBA::Short order_policy = CosNotification::FifoOrder; +static int num_events = 30; +static const char* ior_output_file = "supplier.ior"; + +class sig_i : public POA_sig +{ +public: + sig_i(CORBA::ORB_ptr orb) + : orb_(orb) + , started_(false) + { + } + + void go (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + started_ = true; + } + + void done (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) + { + started_ = false; + } + + void wait_for_startup() + { + while (! started_) { + ACE_Time_Value tv(0, 100 * 1000); // 100ms + orb_->run(tv); + } + } + + void wait_for_completion() + { + while (started_) { + ACE_Time_Value tv(0, 100 * 1000); // 100ms + orb_->run(tv); + } + } + +private: + CORBA::ORB_ptr orb_; + bool started_; +}; + +class Supplier_Client : public Notify_Test_Client +{ +public: + virtual int parse_args (int argc, char* argv[]); +}; + + +int +Supplier_Client::parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:e:d:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'd': + { + const char* order = get_opts.optarg; + if (ACE_OS::strcmp (order, "fifo") == 0) + { + order_policy = CosNotification::FifoOrder; + } + else if (ACE_OS::strcmp (order, "priority") == 0) + { + order_policy = CosNotification::PriorityOrder; + } + else if (ACE_OS::strcmp (order, "deadline") == 0) + { + order_policy = CosNotification::DeadlineOrder; +#if !defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) + ACE_ERROR_RETURN ((LM_ERROR, + "This order policy requires timed message " + "blocks.\nPlease #define " + "ACE_HAS_TIMED_MESSAGE_BLOCKS in your " + "config.h\n"), -1); +#endif + } + else + { + ACE_ERROR_RETURN ((LM_ERROR, + "Unknown order policy: %s\n", + order_policy), + -1); + } + break; + } + + case 'e': + num_events = ACE_OS::atoi (get_opts.optarg); + break; + + case 'o': + ior_output_file = get_opts.optarg; + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile> -e <# of events> " + "-d <fifo|priority|deadline>" + "\n", + argv [0]), + -1); + } + + // Indicates sucessful parsing of the command line + return 0; +} + + +static CosNotifyChannelAdmin::SupplierAdmin_ptr +create_supplieradmin (CosNotifyChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + CosNotifyChannelAdmin::AdminID adminid = 0; + CosNotifyChannelAdmin::SupplierAdmin_var admin = + ec->new_for_suppliers (CosNotifyChannelAdmin::AND_OP, + adminid + ACE_ENV_ARG_PARAMETER); + + ACE_CHECK_RETURN (0); + + return CosNotifyChannelAdmin::SupplierAdmin::_duplicate (admin.in ()); +} + + +static void +SendEvent (int id ACE_ENV_ARG_DECL_NOT_USED) +{ + CosNotification::StructuredEvent event; + + event.header.fixed_header.event_type.domain_name = CORBA::string_dup ("a"); + event.header.fixed_header.event_type.type_name = CORBA::string_dup ("b"); + event.header.fixed_header.event_name = CORBA::string_dup ("test"); + + event.header.variable_header.length (3); + event.header.variable_header[0].name = CORBA::string_dup ("id"); + event.header.variable_header[0].value <<= (CORBA::Long) id; + event.header.variable_header[1].name = + CORBA::string_dup (CosNotification::Priority); + event.header.variable_header[1].value <<= (CORBA::Short) id; + event.header.variable_header[2].name = + CORBA::string_dup (CosNotification::Timeout); + event.header.variable_header[2].value <<= (TimeBase::TimeT) (num_events - id); + + ACE_TRY_NEW_ENV + { + supplier_1->send_event (event ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::Exception, e) + { + ACE_PRINT_EXCEPTION (e, "Error: "); + } + ACE_ENDTRY; +} + +static void +create_suppliers (CosNotifyChannelAdmin::SupplierAdmin_ptr admin, + PortableServer::POA_ptr poa + ACE_ENV_ARG_DECL) +{ + // startup the supplier + ACE_NEW_THROW_EX (supplier_1, + TAO_Notify_Tests_StructuredPushSupplier (), + CORBA::NO_MEMORY ()); + + supplier_1->init (poa ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + supplier_1->connect (admin ACE_ENV_ARG_PARAMETER); + ACE_CHECK; +} + +int main (int argc, char* argv[]) +{ + ACE_Auto_Ptr< sig_i > sig_impl; + ACE_TRY_NEW_ENV + { + Supplier_Client client; + int status = client.init (argc, argv ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + ACE_UNUSED_ARG(status); + ACE_ASSERT(status == 0); + + CosNotifyChannelAdmin::EventChannel_var ec = + client.create_event_channel ("MyEventChannel", 0 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CosNotification::QoSProperties qos (1); + qos.length (1); + qos[0].name = CORBA::string_dup (CosNotification::OrderPolicy); + qos[0].value <<= order_policy; + ec->set_qos (qos ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::ORB_ptr orb = client.orb (); + + sig_impl.reset( new sig_i( orb ) ); + sig_var sig = sig_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::String_var ior = + orb->object_to_string (sig.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (ior_output_file != 0) + { + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for " + "writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + } + + CosNotifyChannelAdmin::SupplierAdmin_var admin = + create_supplieradmin (ec.in () ACE_ENV_ARG_PARAMETER); + ACE_ASSERT(!CORBA::is_nil (admin.in ())); + create_suppliers (admin.in (), client.root_poa () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + sig_impl->wait_for_startup(); + + ACE_DEBUG((LM_DEBUG, "1 supplier sending %d events...\n", num_events)); + for (int i = 0; i < num_events; ++i) + { + ACE_DEBUG((LM_DEBUG, "+")); + SendEvent (i ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_DEBUG((LM_DEBUG, "\nSupplier sent %d events.\n", num_events)); + + sig_impl->wait_for_completion(); + + ACE_OS::unlink (ior_output_file); + + supplier_1->disconnect(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ec->destroy(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + return 0; + } + ACE_CATCH (CORBA::Exception, e) + { + ACE_PRINT_EXCEPTION (e, "Error: "); + } + ACE_ENDTRY; + + return 1; +} diff --git a/TAO/orbsvcs/tests/Notify/Ordering/go.idl b/TAO/orbsvcs/tests/Notify/Ordering/go.idl new file mode 100644 index 00000000000..e24bfd036c7 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/go.idl @@ -0,0 +1,10 @@ +// $Id$ + +interface sig +{ + // Tell the server to start + oneway void go (); + + // Tell the server the consumer is done + oneway void done (); +}; diff --git a/TAO/orbsvcs/tests/Notify/Ordering/notify.conf b/TAO/orbsvcs/tests/Notify/Ordering/notify.conf new file mode 100644 index 00000000000..1a52da7b7ec --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/notify.conf @@ -0,0 +1,6 @@ +## $Id$ +# +## Load the static Cos Notification Service +## DispatchThreads creates a queue for the consumer side +## SourceThreads creates a queue for the supplier side +static Notify_Default_Event_Manager_Objects_Factory "-DispatchingThreads 1 -SourceThreads 1" diff --git a/TAO/orbsvcs/tests/Notify/Ordering/notify.conf.xml b/TAO/orbsvcs/tests/Notify/Ordering/notify.conf.xml new file mode 100644 index 00000000000..5c15de81fdc --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/notify.conf.xml @@ -0,0 +1,6 @@ +<?xml version='1.0'?> +<!-- Converted from ./orbsvcs/tests/Notify/Ordering/notify.conf by svcconf-convert.pl --> +<ACE_Svc_Conf> + <!-- # $Id$ --> + <static id="Notify_Default_Event_Manager_Objects_Factory" params="-DispatchingThreads 1"/> +</ACE_Svc_Conf> diff --git a/TAO/orbsvcs/tests/Notify/Ordering/run_test.pl b/TAO/orbsvcs/tests/Notify/Ordering/run_test.pl new file mode 100755 index 00000000000..c48b9a42e42 --- /dev/null +++ b/TAO/orbsvcs/tests/Notify/Ordering/run_test.pl @@ -0,0 +1,219 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib "../../../../../bin"; +use PerlACE::Run_Test; + +$ior = PerlACE::LocalFile ("supplier.ior"); +$notifyior = PerlACE::LocalFile ("notify.ior"); +$naming_ior = PerlACE::LocalFile ("naming.ior"); +$notify_conf = PerlACE::LocalFile ("notify$PerlACE::svcconf_ext"); +$status = 0; +$deadline = 0; + +foreach my $arg (@ARGV) { + if ($arg eq "-d") { + $deadline = 1; + } + else { + print "Usage: $0 [-d]\n" . + " -d specifies that deadline discarding be tested.\n"; + exit(0); + } +} + +$port = PerlACE::uniqueid () + 10001; +$NS = new PerlACE::Process ("../../../Naming_Service/Naming_Service", + "-ORBEndpoint iiop://localhost:$port -o $naming_ior"); +$TS = new PerlACE::Process ("../../../Notify_Service/Notify_Service", + "-ORBInitRef NameService=iioploc://" . + "localhost:$port/NameService " . + "-IORoutput $notifyior -ORBSvcConf " . + "$notify_conf"); +$STS = new PerlACE::Process ("Structured_Supplier", + "-ORBInitRef NameService=iioploc://" . + "localhost:$port/NameService"); +$STC = new PerlACE::Process ("Structured_Consumer"); + +$SES = new PerlACE::Process ("Sequence_Supplier", + "-ORBInitRef NameService=iioploc://" . + "localhost:$port/NameService"); +$SEC = new PerlACE::Process ("Sequence_Consumer"); + +$client_args = "-ORBInitRef NameService=iioploc://localhost:" . + "$port/NameService"; + +unlink $notifyior; +unlink $naming_ior; + +$NS->Spawn (); +if (PerlACE::waitforfile_timed ($naming_ior, 20) == -1) { + print STDERR "ERROR: waiting for the naming service to start\n"; + $NS->Kill (); + exit 1; +} + +$TS->Spawn (); +if (PerlACE::waitforfile_timed ($notifyior, 20) == -1) { + print STDERR "ERROR: waiting for the notify service to start\n"; + $TS->Kill (); + $NS->Kill (); + exit 1; +} + +if ($deadline) { + ## @@todo : Add combinations of deadline ordering. +} + +# Although the TAO notify service supports OrderPolicy on the supplier side +# QoS, this setting typically serves no practical purpose, and is not testable. +# This is because we have no way to force the supplier-side queue to back up, and +# the OrderPolicy will have no affect. +# Therefore we don't test setting this policy on the supplier side. + +print "**** Structured Supplier (fifo) -> Structured Consumer (none) *****\n"; +unlink $ior; +$STS->Arguments($STS->Arguments() . " -d fifo"); +$STS->Spawn (); +if (PerlACE::waitforfile_timed ($ior, 20) == -1) { + print STDERR "ERROR: waiting for the supplier to start\n"; + $STS->Kill (); + $TS->Kill (); + $NS->Kill (); + $status = 1; + exit 1; +} +$STC->Arguments($client_args . " -d fifo"); +$client = $STC->SpawnWaitKill (20); +if ($client != 0) { + $STS->Kill (); + $TS->Kill (); + $NS->Kill (); + exit 1; +} +$server = $STS->WaitKill(5); +if ($server != 0) { + $TS->Kill (); + $NS->Kill (); + exit 1; +} + +print "**** Structured Supplier (fifo) -> Structured Consumer (priority) *****\n"; +unlink $ior; +$STS->Arguments($STS->Arguments() . " -d fifo"); +$STS->Spawn (); +if (PerlACE::waitforfile_timed ($ior, 20) == -1) { + print STDERR "ERROR: waiting for the supplier to start\n"; + $STS->Kill (); + $TS->Kill (); + $NS->Kill (); + $status = 1; + exit 1; +} +$STC->Arguments($client_args . " -d priority -o"); +$client = $STC->SpawnWaitKill (20); +if ($client != 0) { + $STS->Kill (); + $TS->Kill (); + $NS->Kill (); + exit 1; +} +$server = $STS->WaitKill(5); +if ($server != 0) { + $TS->Kill (); + $NS->Kill (); + exit 1; +} + +print "**** Structured Supplier (fifo) -> Sequence Consumer (priority) *****\n"; +unlink $ior; +$STS->Arguments($STS->Arguments() . " -d fifo"); +$STS->Spawn (); +if (PerlACE::waitforfile_timed ($ior, 20) == -1) { + print STDERR "ERROR: waiting for the supplier to start\n"; + $STS->Kill (); + $TS->Kill (); + $NS->Kill (); + $status = 1; + exit 1; +} +$SEC->Arguments($client_args . " -d priority -o"); +$client = $SEC->SpawnWaitKill (20); +if ($client != 0) { + $STS->Kill (); + $TS->Kill (); + $NS->Kill (); + exit 1; +} +$server = $STS->WaitKill(5); +if ($server != 0) { + $TS->Kill (); + $NS->Kill (); + exit 1; +} + +print "**** Sequence Supplier (fifo) -> Structured Consumer (priority) *****\n"; +unlink $ior; +$SES->Arguments($SES->Arguments() . " -d fifo"); +$SES->Spawn (); +if (PerlACE::waitforfile_timed ($ior, 20) == -1) { + $SES->Kill (); + $TS->Kill (); + $NS->Kill (); + $status = 1; + exit 1; +} +$STC->Arguments($client_args . " -d priority -o"); +$client = $STC->SpawnWaitKill (20); +if ($client != 0) { + $SES->Kill (); + $TS->Kill (); + $NS->Kill (); + exit 1; +} +$server = $SES->WaitKill(5); +if ($server != 0) { + $TS->Kill (); + $NS->Kill (); + exit 1; +} + +print "**** Sequence Supplier (fifo) -> Sequence Consumer (priority) *****\n"; +unlink $ior; +$SES->Arguments($SES->Arguments() . " -d fifo"); +$SES->Spawn (); +if (PerlACE::waitforfile_timed ($ior, 20) == -1) { + $SES->Kill (); + $TS->Kill (); + $NS->Kill (); + $status = 1; + exit 1; +} +$SEC->Arguments($client_args . " -d priority -o"); +$client = $SEC->SpawnWaitKill (20); +if ($client != 0) { + $SES->Kill (); + $TS->Kill (); + $NS->Kill (); + exit 1; +} +$server = $SES->WaitKill(5); +if ($server != 0) { + $TS->Kill (); + $NS->Kill (); + exit 1; +} + + +$TS->Kill (); +$NS->Kill (); + +unlink $ior; +unlink $notifyior; +unlink $naming_ior; + +exit $status; |