diff options
Diffstat (limited to 'TAO')
34 files changed, 1198 insertions, 156 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c index 8e7562797fd..212df08d1b3 100644 --- a/TAO/ChangeLog-99c +++ b/TAO/ChangeLog-99c @@ -1,3 +1,83 @@ +Tue Feb 23 20:10:52 1999 Carlos O'Ryan <coryan@cs.wustl.edu> + + * orbsvcs/orbsvcs/Makefile: + Added EC_ObserverStrategy to the set of files for Event2 + + * orbsvcs/Event/EC_ConsumerAdmin.h: + * orbsvcs/Event/EC_ConsumerAdmin.i: + * orbsvcs/Event/EC_ConsumerAdmin.cpp: + * orbsvcs/Event/EC_ConsumerAdmin_T.h: + * orbsvcs/Event/EC_ConsumerAdmin_T.i: + * orbsvcs/Event/EC_ConsumerAdmin_T.cpp: + * orbsvcs/Event/EC_SupplierFiltering.cpp: + Implemented delayed and/or immediate removal and additions from + the ConsumerAdmin internal set (of ProxyPushSupplier). This is + important to applications that use reactive dispatching and can + generate disconnections and/or connections to the EC as part of + the consumer upcalls. + This strategies require condition variables and thus the use of + templates to parametrize the locking strategy. + I also added a simple policy to control the level of concurrency + in the ConsumerAdmin set: the user can specify how many threads + can be concurrently running on the ConsumerAdmin. If this limit + is exceeded then the new threads must wait until all the other + threads leave. At that point the delayed operations are + executed. + This strategy ensures that operations are not delayed forever; + setting the HWM to 1 (the default) ensures that operations are + executed before the next upcall; setting the HWM to a very high + value results in maximum concurrency; this could be a reasonable + options if periodically there are periods without any activity. + + * orbsvcs/Event/EC_ObserverStrategy.h: + * orbsvcs/Event/EC_ObserverStrategy.i: + * orbsvcs/Event/EC_ObserverStrategy.cpp: + Added missing destructor. + Minor cosmetic changes. + + * orbsvcs/Event/EC_ProxySupplier.h: + * orbsvcs/Event/EC_ProxySupplier.cpp: + Added reference counting to this objects. + + * orbsvcs/Event/EC_Busy_Lock.h: + * orbsvcs/Event/EC_Busy_Lock.i: + * orbsvcs/Event/EC_Busy_Lock.cpp: + A simple adapter that conforms to the ACE_Lock interface. It + invokes the busy() and idle() method on another object to + acquire and release. + + * orbsvcs/Event/EC_Command.h: + * orbsvcs/Event/EC_Command.i: + * orbsvcs/Event/EC_Command.cpp: + Implemented two simple Command objects that can invoke the + connected_i() and disconnected_i() methods on another class, + this are useful to implement the delayed connection and + disconnection from the EC. + + * orbsvcs/Event/EC_Null_Factory.cpp: + * orbsvcs/Event/EC_Basic_Factory.cpp: + Use the new EC_ConsumerAdmin types with the right locking + strategies. + + * tests/EC_Throughput/ECT_Consumer.cpp: + * tests/EC_Throughput/ECT_Consumer.h: + * tests/EC_Throughput/ECT_Consumer_Driver.cpp: + * tests/EC_Throughput/ECT_Consumer_Driver.h: + * tests/EC_Throughput/ECT_Supplier.cpp: + * tests/EC_Throughput/ECT_Supplier.h: + * tests/EC_Throughput/ECT_Supplier_Driver.cpp: + * tests/EC_Throughput/ECT_Supplier_Driver.h: + * tests/EC_Throughput/ECT_Throughput.cpp: + * tests/EC_Throughput/ECT_Throughput.h: + The test is more flexible wrt event types subscribed by each + consumer and published by the suppliers, this could be useful to + measure how the test scales with the number of consumers and/or + suppliers. + + * orbsvcs/RtecEventComm.idl: + * orbsvcs/Event/EC_Event_Channel.cpp: + Minor cosmetic changes. + Tue Feb 23 19:44:20 1999 Yamuna Krishnamurthy <yamuna@cs.wustl.edu> * TAO_IDL/be/be_visitor_root/root.cpp: diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp index 9763c3fcb95..b0b80559e4b 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp @@ -3,7 +3,7 @@ #include "EC_Basic_Factory.h" #include "EC_Dispatching.h" #include "EC_Basic_Filter_Builder.h" -#include "EC_ConsumerAdmin.h" +#include "EC_ConsumerAdmin_T.h" #include "EC_SupplierAdmin.h" #include "EC_ProxyConsumer.h" #include "EC_ProxySupplier.h" @@ -50,7 +50,7 @@ TAO_EC_Basic_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x) TAO_EC_ConsumerAdmin* TAO_EC_Basic_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec) { - return new TAO_EC_ConsumerAdmin (ec); + return new TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH> (ec); } void @@ -114,9 +114,11 @@ TAO_EC_Basic_Factory::destroy_timer_module (TAO_EC_Timer_Module *x) } TAO_EC_ObserverStrategy* -TAO_EC_Basic_Factory::create_observer_strategy (TAO_EC_Event_Channel *) +TAO_EC_Basic_Factory::create_observer_strategy (TAO_EC_Event_Channel *ec) { - return new TAO_EC_Null_ObserverStrategy; + ACE_Lock* lock; + ACE_NEW_RETURN (lock, ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, 0); + return new TAO_EC_Basic_ObserverStrategy (ec, lock); } void @@ -184,3 +186,19 @@ TAO_EC_Basic_Factory::destroy_supplier_admin_lock (ACE_Lock* x) { delete x; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH>; +template class TAO_EC_ConsumerAdmin_T<ACE_MT_SYNCH>; +template class ACE_Node<ACE_Command_Base*>; +template class ACE_Unbounded_Queue<ACE_Command_Base*>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH> +#pragma instantiate TAO_EC_ConsumerAdmin_T<ACE_MT_SYNCH> +#pragma instantiate ACE_Node<ACE_Command_Base*> +#pragma instantiate ACE_Unbounded_Queue<ACE_Command_Base*> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp new file mode 100644 index 00000000000..e85b8a15dab --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp @@ -0,0 +1,62 @@ +// $Id$ + +#ifndef TAO_EC_BUSY_LOCK_CPP +#define TAO_EC_BUSY_LOCK_CPP + +#include "EC_Busy_Lock.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Busy_Lock.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Busy_Lock, "$Id$") + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::remove (void) +{ + return 0; +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::acquire (void) +{ + return this->adaptee_->busy (); +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::tryacquire (void) +{ + return this->adaptee_->busy (); +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::release (void) +{ + return this->adaptee_->idle (); +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::acquire_read (void) +{ + return this->adaptee_->busy (); +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::acquire_write (void) +{ + return this->adaptee_->busy (); +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::tryacquire_read (void) +{ + return this->adaptee_->busy (); +} + +template<class T> +int TAO_EC_Busy_Lock_Adapter<T>::tryacquire_write (void) +{ + return this->adaptee_->busy (); +} + +#endif /* TAO_EC_BUSY_LOCK_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h new file mode 100644 index 00000000000..09cda19b37d --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h @@ -0,0 +1,74 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_ConsumerAdmin +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = DESCRIPTION +// Implement the RtecEventChannelAdmin::ConsumerAdmin interface. +// This class is an Abstract Factory for the +// TAO_EC_ProxyPushSupplier. +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_BUSY_LOCK_H +#define TAO_EC_BUSY_LOCK_H + +#include "ace/OS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class Adaptee> +class TAO_EC_Busy_Lock_Adapter +{ +public: + TAO_EC_Busy_Lock_Adapter (Adaptee* adaptee); + // Constructor + + // = The ACE_Lock methods, please check $ACE_ROOT/ace/Synch.h for + // details. + + int remove (void); + int acquire (void); + int tryacquire (void); + int release (void); + int acquire_read (void); + int acquire_write (void); + int tryacquire_read (void); + int tryacquire_write (void); + +private: + Adaptee* adaptee_; +}; + +#if defined (__ACE_INLINE__) +#include "EC_Busy_Lock.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "EC_Busy_Lock.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("EC_Busy_Lock.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* TAO_EC_BUSY_LOCK_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i new file mode 100644 index 00000000000..dd4c3e7a956 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i @@ -0,0 +1,8 @@ +// $Id$ + +template<class T> ACE_INLINE +TAO_EC_Busy_Lock_Adapter<T>::TAO_EC_Busy_Lock_Adapter (T* adaptee) + : adaptee_ (adaptee) +{ +} + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp new file mode 100644 index 00000000000..773244aab5c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp @@ -0,0 +1,38 @@ +// $Id$ + +#ifndef TAO_EC_COMMAND_CPP +#define TAO_EC_COMMAND_CPP + +#include "EC_Command.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_Command.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_Command, "$Id$") + +template<class Target, class Object> int +TAO_EC_Connected_Command<Target,Object>::execute (void* arg) +{ + CORBA::Environment *env = &CORBA::default_environment (); + if (arg != 0) + env = ACE_dynamic_cast(CORBA::Environment*, arg); + + this->target_->connected_i (this->object_, *env); + return 0; +} + +// **************************************************************** + +template<class Target, class Object> int +TAO_EC_Disconnected_Command<Target,Object>::execute (void* arg) +{ + CORBA::Environment *env = &CORBA::default_environment (); + if (arg != 0) + env = ACE_dynamic_cast(CORBA::Environment*, arg); + + this->target_->disconnected_i (this->object_, *env); + return 0; +} + +#endif /* TAO_EC_COMMAND_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Command.h b/TAO/orbsvcs/orbsvcs/Event/EC_Command.h new file mode 100644 index 00000000000..5b4c1f97224 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Command.h @@ -0,0 +1,133 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_Command +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = DESCRIPTION +// Implement the Command objects for the delayed operations in the +// manipulation of EC_ProxyPushSupplier and EC_ProxyPushConsumer +// sets. +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_COMMAND_H +#define TAO_EC_COMMAND_H + +#include "ace/Functor.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<class Target, class Object> +class TAO_EC_Connected_Command : public ACE_Command_Base +{ + // = TITLE + // EC_Connected_Command + // + // = DESCRIPTION + // Implements a Command object that invokes the connected_i() method + // on the target, passing an argument of type Object. + // + // = MEMORY MANAGMENT + // It does not assume ownership of Object nor the Target + // arguments. + // Usually allocated from the heap or an allocator; but it is not + // self-managed. + // + // = LOCKING + // No provisions for locking, access must be serialized + // externally. + // + // = TODO + // +public: + TAO_EC_Connected_Command (Target *target, + Object *object); + // constructor... + + virtual int execute (void *arg); + // The callback method, if the argument is not nil it is interpreted + // as a CORBA::Environment. + +private: + Target *target_; + // The target + + Object *object_; + // The argument +}; + +// **************************************************************** + +template<class Target, class Object> +class TAO_EC_Disconnected_Command : public ACE_Command_Base +{ + // = TITLE + // EC_Disconnected_Command + // + // = DESCRIPTION + // Implements a Command object that invokes the disconnected_i() + // method on the target, passing an argument of type Object. + // + // = MEMORY MANAGMENT + // It does not assume ownership of Object nor the Target + // arguments. + // Usually allocated from the heap or an allocator; but it is not + // self-managed. + // + // = LOCKING + // No provisions for locking, access must be serialized + // externally. + // + // = TODO + // +public: + TAO_EC_Disconnected_Command (Target *target, + Object *object); + // constructor... + + virtual int execute (void *arg); + // The callback method, if the argument is not nil it is interpreted + // as a CORBA::Environment. + +private: + Target *target_; + // The target + + Object *object_; + // The argument +}; + +// **************************************************************** + +#if defined (__ACE_INLINE__) +#include "EC_Command.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "EC_Command.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("EC_Command.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* TAO_EC_COMMAND_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Command.i b/TAO/orbsvcs/orbsvcs/Event/EC_Command.i new file mode 100644 index 00000000000..6a5716a0e4a --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Command.i @@ -0,0 +1,23 @@ +// $Id$ + +template<class Target, class Object> +TAO_EC_Connected_Command<Target,Object>:: + TAO_EC_Connected_Command (Target *target, + Object *object) + : target_ (target), + object_ (object) +{ +} + +// **************************************************************** + + +template<class Target, class Object> +TAO_EC_Disconnected_Command<Target,Object>:: + TAO_EC_Disconnected_Command (Target *target, + Object *object) + : target_ (target), + object_ (object) +{ +} + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp index 0f038b70f36..815a6c9a293 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp @@ -12,7 +12,9 @@ ACE_RCSID(Event, EC_ConsumerAdmin, "$Id$") TAO_EC_ConsumerAdmin::TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel *ec) - : event_channel_ (ec) + : busy_lock_ (this), + event_channel_ (ec), + busy_hwm_ (1) { } @@ -23,7 +25,7 @@ TAO_EC_ConsumerAdmin::~TAO_EC_ConsumerAdmin (void) void TAO_EC_ConsumerAdmin::set_default_POA (PortableServer::POA_ptr poa) { - this->default_POA_ = + this->default_POA_ = PortableServer::POA::_duplicate (poa); } @@ -31,12 +33,17 @@ void TAO_EC_ConsumerAdmin::connected (TAO_EC_ProxyPushConsumer *consumer, CORBA::Environment &ACE_TRY_ENV) { + ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock, + ace_mon, this->busy_lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + SupplierSetIterator end = this->end (); for (SupplierSetIterator i = this->begin (); i != end; ++i) { (*i)->connected (consumer, ACE_TRY_ENV); + ACE_CHECK; } } @@ -44,35 +51,41 @@ void TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushConsumer *consumer, CORBA::Environment &ACE_TRY_ENV) { + ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock, + ace_mon, this->busy_lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + SupplierSetIterator end = this->end (); for (SupplierSetIterator i = this->begin (); i != end; ++i) { (*i)->disconnected (consumer, ACE_TRY_ENV); + ACE_CHECK; } } void -TAO_EC_ConsumerAdmin::connected (TAO_EC_ProxyPushSupplier *supplier, - CORBA::Environment &ACE_TRY_ENV) +TAO_EC_ConsumerAdmin::connected_i (TAO_EC_ProxyPushSupplier *supplier, + CORBA::Environment &ACE_TRY_ENV) { if (this->all_suppliers_.insert (supplier) != 0) ACE_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO)); } void -TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushSupplier *supplier, - CORBA::Environment &ACE_TRY_ENV) +TAO_EC_ConsumerAdmin::disconnected_i (TAO_EC_ProxyPushSupplier *supplier, + CORBA::Environment &ACE_TRY_ENV) { if (this->all_suppliers_.remove (supplier) != 0) ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR ()); + supplier->_decr_refcnt (); } RtecEventChannelAdmin::ProxyPushSupplier_ptr TAO_EC_ConsumerAdmin::obtain_push_supplier (CORBA::Environment &ACE_TRY_ENV) { - TAO_EC_ProxyPushSupplier* supplier = + TAO_EC_ProxyPushSupplier* supplier = this->event_channel_->create_proxy_push_supplier (); PortableServer::POA_var poa = @@ -88,16 +101,27 @@ TAO_EC_ConsumerAdmin::_default_POA (CORBA::Environment&) return PortableServer::POA::_duplicate (this->default_POA_.in ()); } +void +TAO_EC_ConsumerAdmin::execute_delayed_operations (void) +{ +} + #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Node<TAO_EC_ProxyPushSupplier*>; template class ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>; template class ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>; +template class TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin>; +template class TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>; +template class TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>; #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Node<TAO_EC_ProxyPushSupplier*> #pragma instantiate ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*> #pragma instantiate ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*> +#pragma instantiate TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> +#pragma instantiate TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> +#pragma instantiate TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h index fa8dccd946b..14ddaa8a074 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h @@ -13,9 +13,8 @@ // Carlos O'Ryan (coryan@cs.wustl.edu) // // = DESCRIPTION -// Implement the RtecEventChannelAdmin::ConsumerAdmin interface. -// This class is an Abstract Factory for the -// TAO_EC_ProxyPushSupplier. +// Define the interface for the RtecEventChannelAdmin::ConsumerAdmin +// implementations. // // = CREDITS // Based on previous work by Tim Harrison (harrison@cs.wustl.edu) @@ -31,21 +30,25 @@ #define TAO_EC_CONSUMERADMIN_H #include "ace/Containers.h" -#include "orbsvcs/RtecEventChannelAdminS.h" -#include "orbsvcs/Event/EC_Filter.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/Event/EC_Busy_Lock.h" +#include "orbsvcs/Event/EC_Filter.h" + class TAO_EC_Event_Channel; class TAO_EC_ProxyPushSupplier; class TAO_EC_ProxyPushConsumer; +template<class Target,class Object> class TAO_EC_Connected_Command; +template<class Target,class Object> class TAO_EC_Disconnected_Command; class TAO_EC_ConsumerAdmin : public POA_RtecEventChannelAdmin::ConsumerAdmin { // = TITLE - // ProxyPushSupplier + // ConsumerAdmin // // = DESCRIPTION // Implements the ConsumerAdmin interface, i.e. the factory for @@ -70,6 +73,17 @@ public: typedef ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*> SupplierSet; typedef ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*> SupplierSetIterator; + virtual int busy (void) = 0; + virtual int idle (void) = 0; + // Before using the iterators the clients should invoke this + // methods, that ensures that no changes to the underlying data + // structure will occur. + + void busy_hwm (CORBA::ULong hwm); + CORBA::ULong busy_hwm (void) const; + // This attribute is used to control the maximum number of threads + // that can be running on the + SupplierSetIterator begin (void); SupplierSetIterator end (void); // Iterators over the set of ProxyPushSuppliers @@ -88,9 +102,9 @@ public: // disconnected from it. virtual void connected (TAO_EC_ProxyPushSupplier*, - CORBA::Environment&); + CORBA::Environment&) = 0; virtual void disconnected (TAO_EC_ProxyPushSupplier*, - CORBA::Environment&); + CORBA::Environment&) = 0; // Used to inform the EC that a Supplier has connected or // disconnected from it. @@ -98,6 +112,42 @@ public: virtual RtecEventChannelAdmin::ProxyPushSupplier_ptr obtain_push_supplier (CORBA::Environment &); + typedef TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> Busy_Lock; + Busy_Lock& busy_lock (void); + // This object is an adapter to the busy/idle protocol. + +protected: + virtual void connected_i (TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment &env); + // The implementation of connected(), without locking. + // It does not increase the reference count on the supplier + + virtual void disconnected_i (TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment &env); + // The implementation of disconnected(), without locking. + // It decreases the reference count on the supplier if the operation + // is successful. + + typedef TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> Connected_Command; + typedef TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> Disconnected_Command; + + friend class TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>; + friend class TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>; + // This two classes call the connected_i() and disconnected_i() + // methods, that's ok because they do while this class is holding + // its lock. + + virtual void execute_delayed_operations (void); + // Dervied classes that implement delayed disconnects and connects + // must override this method. + + SupplierSet all_suppliers_; + // The set of all the ProxyPushSupplier objects bound to this + // ConsumerAdmin. + + TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> busy_lock_; + // The busy lock object + private: TAO_EC_Event_Channel *event_channel_; // The Event Channel we belong to @@ -105,7 +155,8 @@ private: PortableServer::POA_var default_POA_; // Store the default POA. - SupplierSet all_suppliers_; + CORBA::ULong busy_hwm_; + // How many threads can simultaneously iterate over the set. }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i index 061e81dba2b..da4752320ec 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i @@ -11,3 +11,21 @@ TAO_EC_ConsumerAdmin::end (void) { return this->all_suppliers_.end (); } + +ACE_INLINE void +TAO_EC_ConsumerAdmin::busy_hwm (CORBA::ULong hwm) +{ + this->busy_hwm_ = hwm; +} + +ACE_INLINE CORBA::ULong +TAO_EC_ConsumerAdmin::busy_hwm (void) const +{ + return this->busy_hwm_; +} + +ACE_INLINE TAO_EC_ConsumerAdmin::Busy_Lock& +TAO_EC_ConsumerAdmin::busy_lock (void) +{ + return this->busy_lock_; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp new file mode 100644 index 00000000000..b36f5bd1a05 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp @@ -0,0 +1,159 @@ +// $Id$ + +#ifndef TAO_EC_CONSUMERADMIN_T_CPP +#define TAO_EC_CONSUMERADMIN_T_CPP + +#include "EC_ConsumerAdmin_T.h" +#include "EC_Command.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_ConsumerAdmin_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_ConsumerAdmin_T, "$Id$") + +template<ACE_SYNCH_DECL>int +TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->busy_i (); +} + +template<ACE_SYNCH_DECL> int +TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::idle (void) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->idle_i (); +} + +template<ACE_SYNCH_DECL> int +TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy_i (void) +{ + if (this->busy_count_ >= this->busy_hwm ()) + { + this->reached_hwm_ = 1; + while (this->reached_hwm_ != 0) + this->busy_cond_.wait (); + } + this->busy_count_++; + return 0; +} + +template<ACE_SYNCH_DECL> int +TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::idle_i (void) +{ + this->busy_count_--; + if (this->busy_count_ == 0) + { + this->reached_hwm_ = 0; + this->execute_delayed_operations (); + this->busy_cond_.broadcast (); + } + return 0; +} + +// **************************************************************** + +template<ACE_SYNCH_DECL> void +TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::connected ( + TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment& ACE_TRY_ENV) +{ + ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + + supplier->_incr_refcnt (); + this->connected_i (supplier, ACE_TRY_ENV); +} + +template<ACE_SYNCH_DECL> void +TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::disconnected ( + TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment& ACE_TRY_ENV) +{ + ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + + this->disconnected_i (supplier, ACE_TRY_ENV); +} + +// **************************************************************** + +template<ACE_SYNCH_DECL> void +TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::connected ( + TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment& ACE_TRY_ENV) +{ + ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + + if (this->busy_count () == 0) + { + // We can add the object immediately + this->connected_i (supplier, ACE_TRY_ENV); + } + else + { + supplier->_incr_refcnt (); + ACE_Command_Base* command; + ACE_NEW (command, + TAO_EC_ConsumerAdmin::Connected_Command (this, + supplier)); + + ACE_DEBUG ((LM_DEBUG, + "EC (%P|%t) Delayed connection command = %x\n", + command)); + + this->command_queue_.enqueue_tail (command); + } +} + +template<ACE_SYNCH_DECL> void +TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::disconnected ( + TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment& ACE_TRY_ENV) +{ + ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + + if (this->busy_count () == 0) + { + // We can remove the object immediately + this->disconnected_i (supplier, ACE_TRY_ENV); + } + else + { + ACE_Command_Base* command; + ACE_NEW (command, + TAO_EC_ConsumerAdmin::Disconnected_Command (this, + supplier)); + ACE_DEBUG ((LM_DEBUG, + "EC (%P|%t) Delayed disconnection command = %x\n", + command)); + + this->command_queue_.enqueue_tail (command); + } +} + +template<ACE_SYNCH_DECL> void +TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::execute_delayed_operations (void) +{ + // LOCKING: the lock is taken by the idle() function + while (!this->command_queue_.is_empty ()) + { + ACE_Command_Base* command; + this->command_queue_.dequeue_head (command); + + command->execute (); + + ACE_DEBUG ((LM_DEBUG, + "EC (%P|%t) Executed delayed command = %x\n", + command)); + + delete command; + } +} + +#endif /* TAO_EC_CONSUMERADMIN_T_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h new file mode 100644 index 00000000000..09474e7a723 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h @@ -0,0 +1,188 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel +// +// = FILENAME +// EC_ConsumerAdmin_T +// +// = AUTHOR +// Carlos O'Ryan (coryan@cs.wustl.edu) +// +// = DESCRIPTION +// Implement concrete versions of the EC_ConsumerAdmin class. This +// concrete versions provide specific locking policies and +// strategies to handle delayed vs. immediate connections and +// disconnections. +// +// = CREDITS +// Based on previous work by Tim Harrison (harrison@cs.wustl.edu) +// and other members of the DOC group. +// More details can be found in: +// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz +// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz +// +// +// ============================================================================ + +#ifndef TAO_EC_CONSUMERADMIN_T_H +#define TAO_EC_CONSUMERADMIN_T_H + +#include "EC_ConsumerAdmin.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +template<ACE_SYNCH_DECL> +class TAO_EC_ConsumerAdmin_T : public TAO_EC_ConsumerAdmin +{ + // = TITLE + // ConsumerAdmin_T + // + // = DESCRIPTION + // Implements the locking policies for the TAO_EC_ConsumerAdmin + // class, we use a parametric class to handle the co-variations + // between mutexes and condition variables. + // + // = MEMORY MANAGMENT + // + // = LOCKING + // The kind of locking is specified as the template argument. + // Clients still should follow the locking protocol: call busy() + // to use the iterator and idle() once finished, the + // Busy_Lock_Adapter can help there. + // + // = TODO + // +public: + TAO_EC_ConsumerAdmin_T (TAO_EC_Event_Channel* event_channel); + // constructor... + + // = The TAO_EC_ConsumerAdmin methods + virtual int busy (void); + virtual int idle (void); + +protected: + virtual int busy_i (void); + // Implements the busy() method, but without locking. + + virtual int idle_i (void); + // Implements the busy() method, but without locking. + +protected: + int busy_count (void) const; + // Return the current value of busy_count [derived classes are not + // allowed to modify this] + + ACE_SYNCH_MUTEX_T lock_; + // The lock + + ACE_SYNCH_CONDITION_T busy_cond_; + // A condition variable to wait while the object is too busy. + +private: + int busy_count_; + // The number of threads iterating on us + + int reached_hwm_; + // The set was too busy and reached its HWM, now everybody has to + // wait until we reach the LWM (0) +}; + +// **************************************************************** + +template<ACE_SYNCH_DECL> +class TAO_EC_ConsumerAdmin_Immediate : public TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> +{ + // = TITLE + // ConsumerAdmin_Immediate + // + // = DESCRIPTION + // A concrete version of the EC_ConsumerAdmin class; using the + // locking strategy in EC_ConsumerAdmin_T and immediate execution + // for the connected() and disconnected() operations. + // + // = MEMORY MANAGMENT + // + // = LOCKING + // The kind of locking is specified as the template argument. + // Clients still should follow the locking protocol: call busy() + // to use the iterator and idle() once finished, the + // Busy_Lock_Adapter can help there. + // + // = TODO + // +public: + TAO_EC_ConsumerAdmin_Immediate (TAO_EC_Event_Channel* event_channel); + // constructor... + + // = The TAO_EC_ConsumerAdmin methods + virtual void connected (TAO_EC_ProxyPushSupplier*, + CORBA::Environment&); + virtual void disconnected (TAO_EC_ProxyPushSupplier*, + CORBA::Environment&); +}; + +// **************************************************************** + +template<ACE_SYNCH_DECL> +class TAO_EC_ConsumerAdmin_Delayed : public TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> +{ + // = TITLE + // ConsumerAdmin_Delayed + // + // = DESCRIPTION + // A concrete version of the EC_ConsumerAdmin class; using the + // locking strategy in EC_ConsumerAdmin_T and storing the + // execution of connected() and disconnected() operations as + // command objects, that are executed once the set is idle. + // + // = MEMORY MANAGMENT + // + // = LOCKING + // The kind of locking is specified as the template argument. + // Clients still should follow the locking protocol: call busy() + // to use the iterator and idle() once finished, the + // Busy_Lock_Adapter can help there. + // + // = TODO + // +public: + TAO_EC_ConsumerAdmin_Delayed (TAO_EC_Event_Channel* event_channel); + // constructor... + + // = The TAO_EC_ConsumerAdmin methods + virtual void connected (TAO_EC_ProxyPushSupplier*, + CORBA::Environment&); + virtual void disconnected (TAO_EC_ProxyPushSupplier*, + CORBA::Environment&); + +protected: + virtual void execute_delayed_operations (void); + // documented in TAO_EC_ConsumerAdmin + +private: + ACE_Unbounded_Queue<ACE_Command_Base*> command_queue_; + // The commands that carry the delayed operations are enqueued + // here. +}; + +// **************************************************************** + +#if defined (__ACE_INLINE__) +#include "EC_ConsumerAdmin_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "EC_Busy_Lock.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("EC_Busy_Lock.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* TAO_EC_CONSUMERADMIN_T_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i new file mode 100644 index 00000000000..4e78d7ad7e5 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i @@ -0,0 +1,31 @@ +// $Id$ + +template<ACE_SYNCH_DECL> ACE_INLINE +TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>:: + TAO_EC_ConsumerAdmin_T (TAO_EC_Event_Channel *event_channel) + : TAO_EC_ConsumerAdmin (event_channel), + busy_cond_ (lock_), + busy_count_ (0), + reached_hwm_ (0) +{ +} + +template<ACE_SYNCH_DECL> ACE_INLINE int +TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy_count (void) const +{ + return this->busy_count_; +} + +template<ACE_SYNCH_DECL> ACE_INLINE +TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>:: + TAO_EC_ConsumerAdmin_Immediate (TAO_EC_Event_Channel *event_channel) + : TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> (event_channel) +{ +} + +template<ACE_SYNCH_DECL> ACE_INLINE +TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>:: + TAO_EC_ConsumerAdmin_Delayed (TAO_EC_Event_Channel *event_channel) + : TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> (event_channel) +{ +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp index c0c565b8b75..45944cb9bd7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp @@ -29,7 +29,7 @@ TAO_EC_Event_Channel::TAO_EC_Event_Channel (TAO_EC_Factory* factory) this->factory_->create_supplier_admin (this); this->timer_module_ = this->factory_->create_timer_module (this); - this->observer_strategy_ = + this->observer_strategy_ = this->factory_->create_observer_strategy (this); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp index d8512c9c955..3f3ba58be52 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp @@ -3,7 +3,7 @@ #include "EC_Null_Factory.h" #include "EC_Dispatching.h" #include "EC_Filter_Builder.h" -#include "EC_ConsumerAdmin.h" +#include "EC_ConsumerAdmin_T.h" #include "EC_SupplierAdmin.h" #include "EC_ProxyConsumer.h" #include "EC_ProxySupplier.h" @@ -50,7 +50,7 @@ TAO_EC_Null_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x) TAO_EC_ConsumerAdmin* TAO_EC_Null_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec) { - return new TAO_EC_ConsumerAdmin (ec); + return new TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH> (ec); } void @@ -184,3 +184,15 @@ TAO_EC_Null_Factory::destroy_supplier_admin_lock (ACE_Lock* x) { delete x; } + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH>; +template class TAO_EC_ConsumerAdmin_T<ACE_NULL_SYNCH>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH> +#pragma instantiate TAO_EC_ConsumerAdmin_T<ACE_NULL_SYNCH> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp index 95e76aff5ff..b59e68f8540 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp @@ -72,6 +72,10 @@ TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*, // **************************************************************** +TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy (void) +{ +} + RtecEventChannelAdmin::Observer_Handle TAO_EC_Basic_ObserverStrategy::append_observer ( RtecEventChannelAdmin::Observer_ptr obs, @@ -156,9 +160,9 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( { // @@ TODO locking in the consumer admin? - TAO_EC_ConsumerAdmin::SupplierSetIterator end = + TAO_EC_ConsumerAdmin::SupplierSetIterator end = this->event_channel_->consumer_admin ()->end (); - for (TAO_EC_ConsumerAdmin::SupplierSetIterator i = + for (TAO_EC_ConsumerAdmin::SupplierSetIterator i = this->event_channel_->consumer_admin ()->begin (); i != end; ++i) @@ -200,15 +204,15 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( { // @@ TODO locking in the consumer admin? - TAO_EC_SupplierAdmin::ConsumerSetIterator end = + TAO_EC_SupplierAdmin::ConsumerSetIterator end = this->event_channel_->supplier_admin ()->end (); - for (TAO_EC_SupplierAdmin::ConsumerSetIterator i = + for (TAO_EC_SupplierAdmin::ConsumerSetIterator i = this->event_channel_->supplier_admin ()->begin (); i != end; ++i) { TAO_EC_ProxyPushConsumer* consumer = *i; - const RtecEventChannelAdmin::SupplierQOS& pub = + const RtecEventChannelAdmin::SupplierQOS& pub = consumer->publications (); if (pub.is_gateway) continue; @@ -217,7 +221,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( const RtecEventComm::Event& event = pub.publications[j].event; RtecEventComm::EventType type = event.header.type; - + if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED) continue; headers.insert (event.header, 1); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h index a6cde6a29a8..56640e06b47 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h @@ -52,7 +52,7 @@ class TAO_EC_ObserverStrategy // The Event Channel supports Observers for the set of // subscriptions and publications. // This is used to implement federations of event channels, - // either through UDP (multicast and unicast) and/or regular CORBA + // either through UDP (multicast and unicast) and/or regular CORBA // calls. // // This behavior of the EC is strategized to avoid overhead when @@ -101,7 +101,7 @@ class TAO_EC_Null_ObserverStrategy : public TAO_EC_ObserverStrategy // A null observer strategy. // // = DESCRIPTION - // This class keeps no information and simply ignores the messages + // This class keeps no information and simply ignores the messages // from the EC. // public: @@ -142,7 +142,7 @@ class TAO_EC_Basic_ObserverStrategy : public TAO_EC_ObserverStrategy // This class simply keeps the information about the current list // of observers, whenever the list of consumers and/or suppliers // changes in queries the EC, computes the global subscription - // and/or publication list and sends the update message to all the + // and/or publication list and sends the update message to all the // observers. // public: diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i index 6bcf535a937..6a677aa122a 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i @@ -33,5 +33,3 @@ TAO_EC_Basic_ObserverStrategy::Observer_Entry:: observer (o) { } - - diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp index 2af0a1838b4..4b9a7367dc2 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp @@ -12,9 +12,12 @@ ACE_RCSID(Event, EC_ProxySupplier, "$Id$") TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel* ec) - : event_channel_ (ec) + : event_channel_ (ec), + refcount_ (1), + suspended_ (0), + child_ (0) { - this->lock_ = + this->lock_ = this->event_channel_->create_supplier_lock (); } @@ -23,6 +26,28 @@ TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void) this->event_channel_->destroy_supplier_lock (this->lock_); } +CORBA::ULong +TAO_EC_ProxyPushSupplier::_incr_refcnt (void) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0); + return this->refcount_++; +} + +CORBA::ULong +TAO_EC_ProxyPushSupplier::_decr_refcnt (void) +{ + { + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0); + this->refcount_--; + if (this->refcount_ != 0) + return this->refcount_; + } + + // Notify the event channel + this->event_channel_->destroy_proxy_push_supplier (this); + return 0; +} + void TAO_EC_ProxyPushSupplier::set_default_POA (PortableServer::POA_ptr poa) { @@ -74,20 +99,22 @@ TAO_EC_ProxyPushSupplier::connect_push_consumer ( const RtecEventChannelAdmin::ConsumerQOS& qos, CORBA::Environment &ACE_TRY_ENV) { - ACE_GUARD_THROW ( - ACE_Lock, ace_mon, *this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + { + ACE_GUARD_THROW ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - if (this->is_connected_i ()) - ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ()); + if (this->is_connected_i ()) + ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ()); - this->consumer_ = - RtecEventComm::PushConsumer::_duplicate (push_consumer); - this->qos_ = qos; + this->consumer_ = + RtecEventComm::PushConsumer::_duplicate (push_consumer); + this->qos_ = qos; - this->child_ = - this->event_channel_->filter_builder ()->build (this->qos_); - this->adopt_child (this->child_); + this->child_ = + this->event_channel_->filter_builder ()->build (this->qos_); + this->adopt_child (this->child_); + } // Notify the event channel... this->event_channel_->connected (this, ACE_TRY_ENV); @@ -101,9 +128,9 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier ( ACE_GUARD_THROW ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - + this->consumer_ = - RtecEventComm::PushConsumer::_nil (); + RtecEventComm::PushConsumer::_nil (); PortableServer::POA_var poa = this->_default_POA_i (); @@ -114,7 +141,7 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier ( poa->deactivate_object (id.in (), ACE_TRY_ENV); ACE_CHECK; } - this->event_channel_->destroy_proxy_push_supplier (this); + this->_decr_refcnt (); } void @@ -167,7 +194,7 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - // Do not take a lock, this is a call back from our child filter, so + // Do not take a lock, this is a call back from our child filter, so // we are holding the lock already (in the filter() method). this->event_channel_->dispatching ()->push (this, this->consumer_, @@ -182,7 +209,7 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, CORBA::Environment& ACE_TRY_ENV) { - // Do not take a lock, this is a call back from our child filter, so + // Do not take a lock, this is a call back from our child filter, so // we are holding the lock already (in the filter() method). this->event_channel_->dispatching ()->push_nocopy (this, this->consumer_, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h index 34051dfcc7a..8ce01cc3585 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h @@ -51,7 +51,6 @@ class TAO_EC_ProxyPushSupplier : public POA_RtecEventChannelAdmin::ProxyPushSupp // used to communicate with a particular consumer. // // = MEMORY MANAGMENT - // This object will be reference counted. // It does not assume ownership of the TAO_EC_Dispatching object. // It makes a copy of the ConsumerQOS and the consumer object // reference. @@ -116,6 +115,14 @@ public: virtual void suspend_connection (CORBA::Environment &); virtual void resume_connection (CORBA::Environment &); + virtual CORBA::ULong _incr_refcnt (void); + virtual CORBA::ULong _decr_refcnt (void); + // Increment and decrement the reference count. + // @@ TODO We use the canonical tao form, but in the future we may + // want to add methods that follow the upcoming CORBA2.3 + // specification, which will include reference counting for + // servants. + // = The TAO_EC_Filter methods, only push() is implemented... virtual int filter (const RtecEventComm::EventSet& event, TAO_EC_QOS_Info& qos_info, @@ -147,6 +154,9 @@ private: ACE_Lock* lock_; // The locking strategy. + CORBA::ULong refcount_; + // The reference count. + RtecEventComm::PushConsumer_var consumer_; // The consumer.... @@ -159,8 +169,8 @@ private: PortableServer::POA_var default_POA_; // Store the default POA. -private: TAO_EC_Filter* child_; + // The filter object }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp index 06d94ed726c..c7e7e9905a7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp @@ -50,8 +50,13 @@ void TAO_EC_Null_SupplierFiltering::push (const RtecEventComm::EventSet& event, CORBA::Environment &ACE_TRY_ENV) { - TAO_EC_ConsumerAdmin* consumer_admin = + TAO_EC_ConsumerAdmin* consumer_admin = this->event_channel_->consumer_admin (); + + ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock, + ace_mon, consumer_admin->busy_lock (), + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + TAO_EC_ConsumerAdmin::SupplierSetIterator end = consumer_admin->end (); @@ -65,5 +70,3 @@ TAO_EC_Null_SupplierFiltering::push (const RtecEventComm::EventSet& event, ACE_CHECK; } } - - diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile index 39d8bea66ec..9bb0777e3f8 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile +++ b/TAO/orbsvcs/orbsvcs/Makefile @@ -165,6 +165,7 @@ ifneq (,$(findstring Event2,$(TAO_ORBSVCS))) Event/EC_Type_Filter \ Event/EC_Basic_Filter_Builder \ Event/EC_Basic_Factory \ + Event/EC_ObserverStrategy \ endif # Event2 diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl index 6e6e4282171..b31df29deb6 100644 --- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl +++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl @@ -84,8 +84,8 @@ module RtecEventComm { // // = DESCRIPTION // Events are represented by this structure, it is simply a - // header,data pair. - // + // header,data pair. + // EventHeader header; EventData data; @@ -95,7 +95,7 @@ module RtecEventComm { interface PushConsumer { oneway void push (in EventSet data); - void disconnect_push_consumer(); + void disconnect_push_consumer(); }; interface PushSupplier { @@ -103,5 +103,3 @@ module RtecEventComm { }; }; - - diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp index dfac0802a25..946d4cde39a 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp @@ -26,7 +26,8 @@ Test_Consumer::Test_Consumer (ECT_Driver *driver, void Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler, const char* name, - int event_a, int event_b, + int type_start, + int type_count, RtecEventChannelAdmin::EventChannel_ptr ec, CORBA::Environment& TAO_IN_ENV) { @@ -53,8 +54,10 @@ Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler, ACE_ConsumerQOS_Factory qos; qos.start_disjunction_group (); qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info); - qos.insert_type (event_a, rt_info); - qos.insert_type (event_b, rt_info); + for (int i = 0; i != type_count; ++i) + { + qos.insert_type (type_start + i, rt_info); + } // = Connect as a consumer. RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = @@ -87,7 +90,7 @@ Test_Consumer::disconnect (CORBA::Environment &TAO_IN_ENV) RtecEventChannelAdmin::ProxyPushSupplier::_nil (); // Deactivate the servant - PortableServer::POA_var poa = + PortableServer::POA_var poa = this->_default_POA (TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); PortableServer::ObjectId_var id = @@ -104,7 +107,7 @@ Test_Consumer::dump_results (const char* name) this->timer_.elapsed_time (tv); double f = 1.0 / (tv.sec () + tv.usec () / 1000000.0); double eps = this->recv_count_ * f; - + ACE_DEBUG ((LM_DEBUG, "ECT_Consumer (%s):\n" " Total time: %d.%08.8d (secs.usecs)\n" @@ -148,11 +151,6 @@ Test_Consumer::push (const RtecEventComm::EventSet& events, { const RtecEventComm::Event& e = events[i]; - if (e.data.payload.mb () == 0) - { - // ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i)); - // continue; - } if (e.header.type == ACE_ES_EVENT_SHUTDOWN) { this->shutdown_count_++; @@ -169,7 +167,7 @@ Test_Consumer::push (const RtecEventComm::EventSet& events, ACE_hrtime_t creation; ORBSVCS_Time::TimeT_to_hrtime (creation, e.header.creation_time); - + ACE_hrtime_t ec_recv; ORBSVCS_Time::TimeT_to_hrtime (ec_recv, e.header.ec_recv_time); diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h index c59cee0a4dd..8b33409dd5b 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h @@ -26,11 +26,15 @@ class Test_Consumer : public POA_RtecEventComm::PushConsumer { - // // = TITLE // Receive the events. // // = DESCRIPTION + // This class is a consumer of events. It subscribes for a + // continous ranges of event types, this permits studying the + // effect of the number of subscriptions for each particular kind + // of event on the EC. + // public: Test_Consumer (ECT_Driver* driver, void* cookie, @@ -38,8 +42,8 @@ public: void connect (RtecScheduler::Scheduler_ptr scheduler, const char* name, - int event_a, - int event_b, + int type_start, + int type_count, RtecEventChannelAdmin::EventChannel_ptr ec, CORBA::Environment& _env); // This method connects the consumer to the EC. diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp index b8e68020b14..a930d680bf2 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp @@ -25,8 +25,8 @@ main (int argc, char *argv []) ECT_Consumer_Driver::ECT_Consumer_Driver (void) : n_consumers_ (1), n_suppliers_ (1), - event_a_ (ACE_ES_EVENT_UNDEFINED), - event_b_ (ACE_ES_EVENT_UNDEFINED + 1), + type_start_ (ACE_ES_EVENT_UNDEFINED), + type_count_ (1), pid_file_name_ (0), active_count_ (0) { @@ -67,14 +67,14 @@ ECT_Consumer_Driver::run (int argc, char* argv[]) "Execution parameters:\n" " consumers = <%d>\n" " suppliers = <%d>\n" - " supplier Event A = <%d>\n" - " supplier Event B = <%d>\n" + " type_start = <%d>\n" + " type count = <%d>\n" " pid file name = <%s>\n", this->n_consumers_, this->n_suppliers_, - this->event_a_, - this->event_b_, + this->type_start_, + this->type_start_, this->pid_file_name_?this->pid_file_name_:"nil") ); @@ -131,7 +131,7 @@ ECT_Consumer_Driver::run (int argc, char* argv[]) TAO_CHECK_ENV; if (CORBA::is_nil (sched_obj.in ())) return 1; - RtecScheduler::Scheduler_var scheduler = + RtecScheduler::Scheduler_var scheduler = RtecScheduler::Scheduler::_narrow (sched_obj.in (), TAO_TRY_ENV); TAO_CHECK_ENV; @@ -223,8 +223,8 @@ ECT_Consumer_Driver::connect_consumers this->consumers_[i]->connect (scheduler, buf, - this->event_a_, - this->event_b_, + this->type_start_, + this->type_count_, channel, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); @@ -277,9 +277,9 @@ ECT_Consumer_Driver::parse_args (int argc, char *argv []) char* aux; char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); - this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + this->type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); arg = ACE_OS::strtok_r (0, ",", &aux); - this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + this->type_count_ = ACE_OS::atoi (arg); } break; @@ -294,7 +294,7 @@ ECT_Consumer_Driver::parse_args (int argc, char *argv []) "[ORB options] " "-c <n_consumers> " "-s <n_suppliers> " - "-h <event_a,event_b> " + "-h <type_start,type_count> " "-p <pid file name> " "\n", argv[0])); @@ -319,5 +319,14 @@ ECT_Consumer_Driver::parse_args (int argc, char *argv []) "suppliers out of range\n", argv[0]), -1); } + if (this->type_count_ <= 0) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "%s: number of event types " + "suppliers out of range, reset to default (1)\n", + argv[0]), -1); + this->type_count_ = 1; + } + return 0; } diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h index 36827c86218..4eaf51054c6 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h @@ -74,9 +74,10 @@ private: // supplier sends a shutdown message after it finishes, the consumer // finishes when all the suppliers do. - int event_a_; - int event_b_; - // We send two types of events, with different contents. + int type_start_; + int type_count_; + // We receive the events whose type is in the range + // [type_start,type_start+type_count) const char* pid_file_name_; // The name of a file where the process stores its pid diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp index 09c2097252f..ea94d0c59e2 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp @@ -18,7 +18,9 @@ Test_Supplier::Test_Supplier (ECT_Driver *driver) burst_count_ (0), burst_size_ (0), event_size_ (0), - burst_pause_ (0) + burst_pause_ (0), + type_start_ (ACE_ES_EVENT_UNDEFINED), + type_count_ (1) { } @@ -29,8 +31,8 @@ Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler, int burst_size, int event_size, int burst_pause, - int event_a, - int event_b, + int type_start, + int type_count, RtecEventChannelAdmin::EventChannel_ptr ec, CORBA::Environment &TAO_IN_ENV) { @@ -38,9 +40,9 @@ Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler, this->burst_size_ = burst_size; this->event_size_ = event_size; this->burst_pause_ = burst_pause; - this->event_a_ = event_a; - this->event_b_ = event_b; - + this->type_start_ = type_start; + this->type_count_ = type_count; + RtecScheduler::handle_t rt_info = scheduler->create (name, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); @@ -71,12 +73,12 @@ Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler, this->supplier_id_)); ACE_SupplierQOS_Factory qos; - qos.insert (this->supplier_id_, - event_a, - rt_info, 1); - qos.insert (this->supplier_id_, - event_b, - rt_info, 1); + for (int i = 0; i != type_count; ++i) + { + qos.insert (this->supplier_id_, + type_start + i, + rt_info, 1); + } qos.insert (this->supplier_id_, ACE_ES_EVENT_SHUTDOWN, rt_info, 1); @@ -112,7 +114,7 @@ Test_Supplier::disconnect (CORBA::Environment &TAO_IN_ENV) RtecEventChannelAdmin::ProxyPushConsumer::_nil (); // Deactivate the servant - PortableServer::POA_var poa = + PortableServer::POA_var poa = this->supplier_._default_POA (TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); PortableServer::ObjectId_var id = @@ -166,10 +168,8 @@ Test_Supplier::svc () { for (int j = 0; j < this->burst_size_; ++j) { - if (j % 2 == 0) - event[0].header.type = this->event_a_; - else - event[0].header.type = this->event_b_; + event[0].header.type = + this->type_start_ + j % this->type_count_; ACE_hrtime_t now = ACE_OS::gethrtime (); ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, @@ -187,7 +187,7 @@ Test_Supplier::svc () this->consumer_proxy ()->push(event, TAO_TRY_ENV); TAO_CHECK_ENV; this->timer_.stop (); - + } TAO_CATCH (CORBA::SystemException, sys_ex) { @@ -226,7 +226,7 @@ Test_Supplier::dump_results (const char* name) int event_count = this->burst_count_ * this->burst_size_ + 1; double f = 1.0 / (tv.sec () + tv.usec () / 1000000.0); double eps = event_count * f; - + ACE_DEBUG ((LM_DEBUG, "ECT_Supplier (%s):\n" " Total time: %d.%08.8d (secs.usecs)\n" diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h index a694b13fb2e..233e270f3d7 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h @@ -4,9 +4,8 @@ // ============================================================================ // // = DESCRIPTION -// This test to measure how many events per minute can the EC -// process, it also serves as an example how how to encode complex -// data types in a octet sequence. +// This is a helper class for the throughput tests of the Event +// Channel. // // ============================================================================ @@ -43,8 +42,8 @@ public: int burst_size, int event_size, int burst_pause, - int event_a, - int event_b, + int type_start, + int type_count, RtecEventChannelAdmin::EventChannel_ptr ec, CORBA::Environment& _env); // This method connects the supplier to the EC. @@ -89,8 +88,8 @@ private: int burst_size_; int event_size_; int burst_pause_; - int event_a_; - int event_b_; + int type_start_; + int type_count_; // The test data. }; diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp index 7444b067e0b..8423b26bc26 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp @@ -28,8 +28,8 @@ ECT_Supplier_Driver::ECT_Supplier_Driver (void) burst_size_ (100), event_size_ (128), burst_pause_ (100), - event_a_ (ACE_ES_EVENT_UNDEFINED), - event_b_ (ACE_ES_EVENT_UNDEFINED + 1), + type_start_ (ACE_ES_EVENT_UNDEFINED), + type_count_ (1), pid_file_name_ (0) { } @@ -78,8 +78,8 @@ ECT_Supplier_Driver::run (int argc, char* argv[]) " burst size = <%d>\n" " event size = <%d>\n" " burst size = <%d>\n" - " supplier Event A = <%d>\n" - " supplier Event B = <%d>\n" + " type start = <%d>\n" + " type count = <%d>\n" " pid file name = <%s>\n", this->n_suppliers_, @@ -87,8 +87,8 @@ ECT_Supplier_Driver::run (int argc, char* argv[]) this->burst_size_, this->event_size_, this->burst_pause_, - this->event_a_, - this->event_b_, + this->type_start_, + this->type_count_, this->pid_file_name_?this->pid_file_name_:"nil") ); @@ -145,7 +145,7 @@ ECT_Supplier_Driver::run (int argc, char* argv[]) TAO_CHECK_ENV; if (CORBA::is_nil (sched_obj.in ())) return 1; - RtecScheduler::Scheduler_var scheduler = + RtecScheduler::Scheduler_var scheduler = RtecScheduler::Scheduler::_narrow (sched_obj.in (), TAO_TRY_ENV); TAO_CHECK_ENV; @@ -168,7 +168,7 @@ ECT_Supplier_Driver::run (int argc, char* argv[]) poa_manager->activate (TAO_TRY_ENV); TAO_CHECK_ENV; - this->connect_suppliers (scheduler.in (), + this->connect_suppliers (scheduler.in (), channel.in (), TAO_TRY_ENV); TAO_CHECK_ENV; @@ -227,8 +227,8 @@ ECT_Supplier_Driver::connect_suppliers this->burst_size_, this->event_size_, this->burst_pause_, - this->event_a_, - this->event_b_, + this->type_start_, + this->type_count_, channel, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); @@ -301,9 +301,9 @@ ECT_Supplier_Driver::parse_args (int argc, char *argv []) char* aux; char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); - this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + this->type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); arg = ACE_OS::strtok_r (0, ",", &aux); - this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + this->type_count_ = ACE_OS::atoi (arg); } break; diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h index 7db63600626..a4294e0e929 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h @@ -4,9 +4,8 @@ // ============================================================================ // // = DESCRIPTION -// This test to measure how many events per minute can the EC -// process, it also serves as an example how how to encode complex -// data types in a octet sequence. +// This is a helper class for the throughput tests of the Event +// Channel. // // ============================================================================ @@ -82,8 +81,8 @@ private: int burst_pause_; // The time between each event burst, in microseconds. - int event_a_; - int event_b_; + int type_start_; + int type_count_; // We send two types of events, with different contents. const char* pid_file_name_; diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp index 9fac6231604..ed0bbf768c2 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp @@ -15,6 +15,7 @@ #include "orbsvcs/Event/Module_Factory.h" #include "orbsvcs/Event/EC_Event_Channel.h" #include "orbsvcs/Event/EC_Basic_Factory.h" +#include "orbsvcs/Event/EC_ConsumerAdmin.h" #include "ECT_Throughput.h" ACE_RCSID(EC_Throughput, ECT_Throughput, "$Id$") @@ -35,12 +36,17 @@ ECT_Throughput::ECT_Throughput (void) burst_size_ (100), event_size_ (128), burst_pause_ (100), - event_a_ (ACE_ES_EVENT_UNDEFINED), - event_b_ (ACE_ES_EVENT_UNDEFINED + 1), + consumer_type_start_ (ACE_ES_EVENT_UNDEFINED), + consumer_type_count_ (1), + consumer_type_shift_ (0), + supplier_type_start_ (ACE_ES_EVENT_UNDEFINED), + supplier_type_count_ (1), + supplier_type_shift_ (0), pid_file_name_ (0), active_count_ (0), reactive_ec_ (0), - new_ec_ (0) + new_ec_ (0), + ec_concurrency_hwm_ (1) { } @@ -83,11 +89,16 @@ ECT_Throughput::run (int argc, char* argv[]) " burst size = <%d>\n" " event size = <%d>\n" " burst size = <%d>\n" - " supplier Event A = <%d>\n" - " supplier Event B = <%d>\n" + " consumer type start = <%d>\n" + " consumer type count = <%d>\n" + " consumer type shift = <%d>\n" + " supplier type start = <%d>\n" + " supplier type count = <%d>\n" + " supplier type shift = <%d>\n" " pid file name = <%s>\n" " remote EC = <%d>\n" - " new EC = <%d>\n", + " new EC = <%d>\n" + " concurrency HWM = <%d>\n", this->n_consumers_, this->n_suppliers_, @@ -95,12 +106,17 @@ ECT_Throughput::run (int argc, char* argv[]) this->burst_size_, this->event_size_, this->burst_pause_, - this->event_a_, - this->event_b_, + this->consumer_type_start_, + this->consumer_type_count_, + this->consumer_type_shift_, + this->supplier_type_start_, + this->supplier_type_count_, + this->supplier_type_shift_, this->pid_file_name_?this->pid_file_name_:"nil", this->reactive_ec_, - this->new_ec_ + this->new_ec_, + this->ec_concurrency_hwm_ ) ); if (this->pid_file_name_ != 0) @@ -175,7 +191,7 @@ ECT_Throughput::run (int argc, char* argv[]) str.in ())); // Register the servant with the Naming Context.... - naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV); + naming_context->rebind (schedule_name, scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_Scheduler_Factory::use_config (naming_context.in ()); @@ -219,9 +235,11 @@ ECT_Throughput::run (int argc, char* argv[]) new TAO_EC_Event_Channel (ec_factory.get ()); ec->activate (TAO_TRY_ENV); TAO_CHECK_ENV; + ec->consumer_admin ()->busy_hwm (this->ec_concurrency_hwm_); ec_impl = auto_ptr<POA_RtecEventChannelAdmin::EventChannel> (ec); + #else ACE_ERROR_RETURN ((LM_ERROR, "The new event channel is not supported " @@ -321,7 +339,7 @@ ECT_Throughput::run (int argc, char* argv[]) void ECT_Throughput::shutdown_consumer (void*, - CORBA::Environment &) + CORBA::Environment &) { // int ID = // (ACE_reinterpret_cast(Test_Consumer**,consumer_cookie) @@ -355,10 +373,13 @@ ECT_Throughput::connect_consumers this->consumers_ + i, this->n_suppliers_)); + int start = this->consumer_type_start_ + + i * this->consumer_type_shift_; + this->consumers_[i]->connect (scheduler, buf, - this->event_a_, - this->event_b_, + start, + this->consumer_type_count_, channel, TAO_IN_ENV); if (TAO_IN_ENV.exception () != 0) return; @@ -378,14 +399,15 @@ ECT_Throughput::connect_suppliers ACE_NEW (this->suppliers_[i], Test_Supplier (this)); + int start = this->supplier_type_start_ + i*this->supplier_type_shift_; this->suppliers_[i]->connect (scheduler, buf, this->burst_count_, this->burst_size_, this->event_size_, this->burst_pause_, - this->event_a_, - this->event_b_, + start, + this->supplier_type_count_, channel, TAO_IN_ENV); if (TAO_IN_ENV.exception () != 0) return; @@ -444,7 +466,7 @@ ECT_Throughput::dump_results (void) int ECT_Throughput::parse_args (int argc, char *argv []) { - ACE_Get_Opt get_opt (argc, argv, "dc:s:u:n:t:b:h:p:m:r"); + ACE_Get_Opt get_opt (argc, argv, "rdc:s:u:n:t:b:h:l:p:m:w:"); int opt; while ((opt = get_opt ()) != EOF) @@ -512,9 +534,24 @@ ECT_Throughput::parse_args (int argc, char *argv []) char* aux; char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); - this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + this->consumer_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->consumer_type_count_ = ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->consumer_type_shift_ = ACE_OS::atoi (arg); + } + break; + + case 'l': + { + char* aux; + char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); + + this->supplier_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + arg = ACE_OS::strtok_r (0, ",", &aux); + this->supplier_type_count_ = ACE_OS::atoi (arg); arg = ACE_OS::strtok_r (0, ",", &aux); - this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); + this->supplier_type_shift_ = ACE_OS::atoi (arg); } break; @@ -522,6 +559,10 @@ ECT_Throughput::parse_args (int argc, char *argv []) this->pid_file_name_ = get_opt.optarg; break; + case 'w': + this->ec_concurrency_hwm_ = ACE_OS::atoi (get_opt.optarg); + break; + case '?': default: ACE_DEBUG ((LM_DEBUG, @@ -533,8 +574,10 @@ ECT_Throughput::parse_args (int argc, char *argv []) "-n <burst size> " "-b <event payload size> " "-t <burst pause (usecs)> " - "-h <eventa,eventb> " + "-h <consumer_start,consumer_count,consumer_shift> " + "-l <supplier_start,supplier_count,supplier_shift> " "-p <pid file name> " + "-w <concurrency HWM> " "-r " "\n", argv[0])); @@ -603,6 +646,15 @@ ECT_Throughput::parse_args (int argc, char *argv []) argv[0], 1), -1); } + if (this->ec_concurrency_hwm_ <= 0) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "%s: invalid concurrency HWM, " + "reset to default (%d)\n", + argv[0], 1), -1); + this->ec_concurrency_hwm_ = 1; + } + return 0; } diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h index a37826e2543..47d1ff7dacd 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h @@ -4,6 +4,9 @@ // ============================================================================ // // = DESCRIPTION +// This is a helper class for the throughput tests of the Event +// Channel. +// Used for the collocated test. // // ============================================================================ @@ -93,9 +96,23 @@ private: int burst_pause_; // The time between each event burst, in microseconds. - int event_a_; - int event_b_; - // We send two types of events, with different contents. + int consumer_type_start_; + int consumer_type_count_; + int consumer_type_shift_; + // The consumers subscribe to different sets of events, as follows: + // Consumer0: [start , start + count) + // Consumer1: [start + 1*shift, start + 1*shift + count) + // Consumer2: [start + 2*shift, start + 2*shift + count) + // And so on. + + int supplier_type_start_; + int supplier_type_count_; + int supplier_type_shift_; + // The suppliers generate different sets of events, as follows: + // Supplier0: [start , start + count) + // Supplier1: [start + 1*shift, start + 1*shift + count) + // Supplier2: [start + 2*shift, start + 2*shift + count) + // And so on. const char* pid_file_name_; // The name of a file where the process stores its pid @@ -112,6 +129,9 @@ private: int new_ec_; // If not zero then we use the new EC implementation + + int ec_concurrency_hwm_; + // Set the HWM for the concurrency in the EC. }; #endif /* ECT_TRHOUGHPUT_H */ |