summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-02-24 02:29:28 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-02-24 02:29:28 +0000
commit03268b6226fd25a47e482ade493b38cd80b81bfc (patch)
treebcb75d2e7954a9cd92e77602edf6fab11fd48f67 /TAO/orbsvcs
parentee01dc48c2192e81cee28c8c17b7c735b1a57bd8 (diff)
downloadATCD-03268b6226fd25a47e482ade493b38cd80b81bfc.tar.gz
ChangeLogTag:Tue Feb 23 20:10:52 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp26
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp62
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h74
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i8
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp38
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Command.h133
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Command.i23
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp38
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h69
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i18
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp159
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h188
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i31
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp16
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp16
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp63
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h14
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp9
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile1
-rw-r--r--TAO/orbsvcs/orbsvcs/RtecEventComm.idl8
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp20
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h10
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp33
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h7
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp38
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h13
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp24
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h9
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp90
-rw-r--r--TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h26
33 files changed, 1118 insertions, 156 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
index 9763c3fcb95..b0b80559e4b 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Basic_Factory.cpp
@@ -3,7 +3,7 @@
#include "EC_Basic_Factory.h"
#include "EC_Dispatching.h"
#include "EC_Basic_Filter_Builder.h"
-#include "EC_ConsumerAdmin.h"
+#include "EC_ConsumerAdmin_T.h"
#include "EC_SupplierAdmin.h"
#include "EC_ProxyConsumer.h"
#include "EC_ProxySupplier.h"
@@ -50,7 +50,7 @@ TAO_EC_Basic_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x)
TAO_EC_ConsumerAdmin*
TAO_EC_Basic_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec)
{
- return new TAO_EC_ConsumerAdmin (ec);
+ return new TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH> (ec);
}
void
@@ -114,9 +114,11 @@ TAO_EC_Basic_Factory::destroy_timer_module (TAO_EC_Timer_Module *x)
}
TAO_EC_ObserverStrategy*
-TAO_EC_Basic_Factory::create_observer_strategy (TAO_EC_Event_Channel *)
+TAO_EC_Basic_Factory::create_observer_strategy (TAO_EC_Event_Channel *ec)
{
- return new TAO_EC_Null_ObserverStrategy;
+ ACE_Lock* lock;
+ ACE_NEW_RETURN (lock, ACE_Lock_Adapter<ACE_SYNCH_MUTEX>, 0);
+ return new TAO_EC_Basic_ObserverStrategy (ec, lock);
}
void
@@ -184,3 +186,19 @@ TAO_EC_Basic_Factory::destroy_supplier_admin_lock (ACE_Lock* x)
{
delete x;
}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH>;
+template class TAO_EC_ConsumerAdmin_T<ACE_MT_SYNCH>;
+template class ACE_Node<ACE_Command_Base*>;
+template class ACE_Unbounded_Queue<ACE_Command_Base*>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate TAO_EC_ConsumerAdmin_Delayed<ACE_MT_SYNCH>
+#pragma instantiate TAO_EC_ConsumerAdmin_T<ACE_MT_SYNCH>
+#pragma instantiate ACE_Node<ACE_Command_Base*>
+#pragma instantiate ACE_Unbounded_Queue<ACE_Command_Base*>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp
new file mode 100644
index 00000000000..e85b8a15dab
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.cpp
@@ -0,0 +1,62 @@
+// $Id$
+
+#ifndef TAO_EC_BUSY_LOCK_CPP
+#define TAO_EC_BUSY_LOCK_CPP
+
+#include "EC_Busy_Lock.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "EC_Busy_Lock.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, EC_Busy_Lock, "$Id$")
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::remove (void)
+{
+ return 0;
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::acquire (void)
+{
+ return this->adaptee_->busy ();
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::tryacquire (void)
+{
+ return this->adaptee_->busy ();
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::release (void)
+{
+ return this->adaptee_->idle ();
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::acquire_read (void)
+{
+ return this->adaptee_->busy ();
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::acquire_write (void)
+{
+ return this->adaptee_->busy ();
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::tryacquire_read (void)
+{
+ return this->adaptee_->busy ();
+}
+
+template<class T>
+int TAO_EC_Busy_Lock_Adapter<T>::tryacquire_write (void)
+{
+ return this->adaptee_->busy ();
+}
+
+#endif /* TAO_EC_BUSY_LOCK_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h
new file mode 100644
index 00000000000..09cda19b37d
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.h
@@ -0,0 +1,74 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel
+//
+// = FILENAME
+// EC_ConsumerAdmin
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// = DESCRIPTION
+// Implement the RtecEventChannelAdmin::ConsumerAdmin interface.
+// This class is an Abstract Factory for the
+// TAO_EC_ProxyPushSupplier.
+//
+// = CREDITS
+// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
+// and other members of the DOC group.
+// More details can be found in:
+// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
+// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
+//
+//
+// ============================================================================
+
+#ifndef TAO_EC_BUSY_LOCK_H
+#define TAO_EC_BUSY_LOCK_H
+
+#include "ace/OS.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class Adaptee>
+class TAO_EC_Busy_Lock_Adapter
+{
+public:
+ TAO_EC_Busy_Lock_Adapter (Adaptee* adaptee);
+ // Constructor
+
+ // = The ACE_Lock methods, please check $ACE_ROOT/ace/Synch.h for
+ // details.
+
+ int remove (void);
+ int acquire (void);
+ int tryacquire (void);
+ int release (void);
+ int acquire_read (void);
+ int acquire_write (void);
+ int tryacquire_read (void);
+ int tryacquire_write (void);
+
+private:
+ Adaptee* adaptee_;
+};
+
+#if defined (__ACE_INLINE__)
+#include "EC_Busy_Lock.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "EC_Busy_Lock.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("EC_Busy_Lock.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* TAO_EC_BUSY_LOCK_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i
new file mode 100644
index 00000000000..dd4c3e7a956
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Busy_Lock.i
@@ -0,0 +1,8 @@
+// $Id$
+
+template<class T> ACE_INLINE
+TAO_EC_Busy_Lock_Adapter<T>::TAO_EC_Busy_Lock_Adapter (T* adaptee)
+ : adaptee_ (adaptee)
+{
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp
new file mode 100644
index 00000000000..773244aab5c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Command.cpp
@@ -0,0 +1,38 @@
+// $Id$
+
+#ifndef TAO_EC_COMMAND_CPP
+#define TAO_EC_COMMAND_CPP
+
+#include "EC_Command.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "EC_Command.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, EC_Command, "$Id$")
+
+template<class Target, class Object> int
+TAO_EC_Connected_Command<Target,Object>::execute (void* arg)
+{
+ CORBA::Environment *env = &CORBA::default_environment ();
+ if (arg != 0)
+ env = ACE_dynamic_cast(CORBA::Environment*, arg);
+
+ this->target_->connected_i (this->object_, *env);
+ return 0;
+}
+
+// ****************************************************************
+
+template<class Target, class Object> int
+TAO_EC_Disconnected_Command<Target,Object>::execute (void* arg)
+{
+ CORBA::Environment *env = &CORBA::default_environment ();
+ if (arg != 0)
+ env = ACE_dynamic_cast(CORBA::Environment*, arg);
+
+ this->target_->disconnected_i (this->object_, *env);
+ return 0;
+}
+
+#endif /* TAO_EC_COMMAND_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Command.h b/TAO/orbsvcs/orbsvcs/Event/EC_Command.h
new file mode 100644
index 00000000000..5b4c1f97224
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Command.h
@@ -0,0 +1,133 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel
+//
+// = FILENAME
+// EC_Command
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// = DESCRIPTION
+// Implement the Command objects for the delayed operations in the
+// manipulation of EC_ProxyPushSupplier and EC_ProxyPushConsumer
+// sets.
+//
+// = CREDITS
+// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
+// and other members of the DOC group.
+// More details can be found in:
+// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
+// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
+//
+//
+// ============================================================================
+
+#ifndef TAO_EC_COMMAND_H
+#define TAO_EC_COMMAND_H
+
+#include "ace/Functor.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<class Target, class Object>
+class TAO_EC_Connected_Command : public ACE_Command_Base
+{
+ // = TITLE
+ // EC_Connected_Command
+ //
+ // = DESCRIPTION
+ // Implements a Command object that invokes the connected_i() method
+ // on the target, passing an argument of type Object.
+ //
+ // = MEMORY MANAGMENT
+ // It does not assume ownership of Object nor the Target
+ // arguments.
+ // Usually allocated from the heap or an allocator; but it is not
+ // self-managed.
+ //
+ // = LOCKING
+ // No provisions for locking, access must be serialized
+ // externally.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_Connected_Command (Target *target,
+ Object *object);
+ // constructor...
+
+ virtual int execute (void *arg);
+ // The callback method, if the argument is not nil it is interpreted
+ // as a CORBA::Environment.
+
+private:
+ Target *target_;
+ // The target
+
+ Object *object_;
+ // The argument
+};
+
+// ****************************************************************
+
+template<class Target, class Object>
+class TAO_EC_Disconnected_Command : public ACE_Command_Base
+{
+ // = TITLE
+ // EC_Disconnected_Command
+ //
+ // = DESCRIPTION
+ // Implements a Command object that invokes the disconnected_i()
+ // method on the target, passing an argument of type Object.
+ //
+ // = MEMORY MANAGMENT
+ // It does not assume ownership of Object nor the Target
+ // arguments.
+ // Usually allocated from the heap or an allocator; but it is not
+ // self-managed.
+ //
+ // = LOCKING
+ // No provisions for locking, access must be serialized
+ // externally.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_Disconnected_Command (Target *target,
+ Object *object);
+ // constructor...
+
+ virtual int execute (void *arg);
+ // The callback method, if the argument is not nil it is interpreted
+ // as a CORBA::Environment.
+
+private:
+ Target *target_;
+ // The target
+
+ Object *object_;
+ // The argument
+};
+
+// ****************************************************************
+
+#if defined (__ACE_INLINE__)
+#include "EC_Command.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "EC_Command.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("EC_Command.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* TAO_EC_COMMAND_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Command.i b/TAO/orbsvcs/orbsvcs/Event/EC_Command.i
new file mode 100644
index 00000000000..6a5716a0e4a
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Command.i
@@ -0,0 +1,23 @@
+// $Id$
+
+template<class Target, class Object>
+TAO_EC_Connected_Command<Target,Object>::
+ TAO_EC_Connected_Command (Target *target,
+ Object *object)
+ : target_ (target),
+ object_ (object)
+{
+}
+
+// ****************************************************************
+
+
+template<class Target, class Object>
+TAO_EC_Disconnected_Command<Target,Object>::
+ TAO_EC_Disconnected_Command (Target *target,
+ Object *object)
+ : target_ (target),
+ object_ (object)
+{
+}
+
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
index 0f038b70f36..815a6c9a293 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
@@ -12,7 +12,9 @@
ACE_RCSID(Event, EC_ConsumerAdmin, "$Id$")
TAO_EC_ConsumerAdmin::TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel *ec)
- : event_channel_ (ec)
+ : busy_lock_ (this),
+ event_channel_ (ec),
+ busy_hwm_ (1)
{
}
@@ -23,7 +25,7 @@ TAO_EC_ConsumerAdmin::~TAO_EC_ConsumerAdmin (void)
void
TAO_EC_ConsumerAdmin::set_default_POA (PortableServer::POA_ptr poa)
{
- this->default_POA_ =
+ this->default_POA_ =
PortableServer::POA::_duplicate (poa);
}
@@ -31,12 +33,17 @@ void
TAO_EC_ConsumerAdmin::connected (TAO_EC_ProxyPushConsumer *consumer,
CORBA::Environment &ACE_TRY_ENV)
{
+ ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock,
+ ace_mon, this->busy_lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
SupplierSetIterator end = this->end ();
for (SupplierSetIterator i = this->begin ();
i != end;
++i)
{
(*i)->connected (consumer, ACE_TRY_ENV);
+ ACE_CHECK;
}
}
@@ -44,35 +51,41 @@ void
TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushConsumer *consumer,
CORBA::Environment &ACE_TRY_ENV)
{
+ ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock,
+ ace_mon, this->busy_lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
SupplierSetIterator end = this->end ();
for (SupplierSetIterator i = this->begin ();
i != end;
++i)
{
(*i)->disconnected (consumer, ACE_TRY_ENV);
+ ACE_CHECK;
}
}
void
-TAO_EC_ConsumerAdmin::connected (TAO_EC_ProxyPushSupplier *supplier,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_ConsumerAdmin::connected_i (TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &ACE_TRY_ENV)
{
if (this->all_suppliers_.insert (supplier) != 0)
ACE_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO));
}
void
-TAO_EC_ConsumerAdmin::disconnected (TAO_EC_ProxyPushSupplier *supplier,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_ConsumerAdmin::disconnected_i (TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &ACE_TRY_ENV)
{
if (this->all_suppliers_.remove (supplier) != 0)
ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR ());
+ supplier->_decr_refcnt ();
}
RtecEventChannelAdmin::ProxyPushSupplier_ptr
TAO_EC_ConsumerAdmin::obtain_push_supplier (CORBA::Environment &ACE_TRY_ENV)
{
- TAO_EC_ProxyPushSupplier* supplier =
+ TAO_EC_ProxyPushSupplier* supplier =
this->event_channel_->create_proxy_push_supplier ();
PortableServer::POA_var poa =
@@ -88,16 +101,27 @@ TAO_EC_ConsumerAdmin::_default_POA (CORBA::Environment&)
return PortableServer::POA::_duplicate (this->default_POA_.in ());
}
+void
+TAO_EC_ConsumerAdmin::execute_delayed_operations (void)
+{
+}
+
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Node<TAO_EC_ProxyPushSupplier*>;
template class ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>;
template class ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>;
+template class TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin>;
+template class TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
+template class TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Node<TAO_EC_ProxyPushSupplier*>
#pragma instantiate ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*>
#pragma instantiate ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*>
+#pragma instantiate TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin>
+#pragma instantiate TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>
+#pragma instantiate TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
index fa8dccd946b..14ddaa8a074 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
@@ -13,9 +13,8 @@
// Carlos O'Ryan (coryan@cs.wustl.edu)
//
// = DESCRIPTION
-// Implement the RtecEventChannelAdmin::ConsumerAdmin interface.
-// This class is an Abstract Factory for the
-// TAO_EC_ProxyPushSupplier.
+// Define the interface for the RtecEventChannelAdmin::ConsumerAdmin
+// implementations.
//
// = CREDITS
// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
@@ -31,21 +30,25 @@
#define TAO_EC_CONSUMERADMIN_H
#include "ace/Containers.h"
-#include "orbsvcs/RtecEventChannelAdminS.h"
-#include "orbsvcs/Event/EC_Filter.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "orbsvcs/RtecEventChannelAdminS.h"
+#include "orbsvcs/Event/EC_Busy_Lock.h"
+#include "orbsvcs/Event/EC_Filter.h"
+
class TAO_EC_Event_Channel;
class TAO_EC_ProxyPushSupplier;
class TAO_EC_ProxyPushConsumer;
+template<class Target,class Object> class TAO_EC_Connected_Command;
+template<class Target,class Object> class TAO_EC_Disconnected_Command;
class TAO_EC_ConsumerAdmin : public POA_RtecEventChannelAdmin::ConsumerAdmin
{
// = TITLE
- // ProxyPushSupplier
+ // ConsumerAdmin
//
// = DESCRIPTION
// Implements the ConsumerAdmin interface, i.e. the factory for
@@ -70,6 +73,17 @@ public:
typedef ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier*> SupplierSet;
typedef ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier*> SupplierSetIterator;
+ virtual int busy (void) = 0;
+ virtual int idle (void) = 0;
+ // Before using the iterators the clients should invoke this
+ // methods, that ensures that no changes to the underlying data
+ // structure will occur.
+
+ void busy_hwm (CORBA::ULong hwm);
+ CORBA::ULong busy_hwm (void) const;
+ // This attribute is used to control the maximum number of threads
+ // that can be running on the
+
SupplierSetIterator begin (void);
SupplierSetIterator end (void);
// Iterators over the set of ProxyPushSuppliers
@@ -88,9 +102,9 @@ public:
// disconnected from it.
virtual void connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&);
+ CORBA::Environment&) = 0;
virtual void disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment&);
+ CORBA::Environment&) = 0;
// Used to inform the EC that a Supplier has connected or
// disconnected from it.
@@ -98,6 +112,42 @@ public:
virtual RtecEventChannelAdmin::ProxyPushSupplier_ptr
obtain_push_supplier (CORBA::Environment &);
+ typedef TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> Busy_Lock;
+ Busy_Lock& busy_lock (void);
+ // This object is an adapter to the busy/idle protocol.
+
+protected:
+ virtual void connected_i (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &env);
+ // The implementation of connected(), without locking.
+ // It does not increase the reference count on the supplier
+
+ virtual void disconnected_i (TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &env);
+ // The implementation of disconnected(), without locking.
+ // It decreases the reference count on the supplier if the operation
+ // is successful.
+
+ typedef TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> Connected_Command;
+ typedef TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier> Disconnected_Command;
+
+ friend class TAO_EC_Connected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
+ friend class TAO_EC_Disconnected_Command<TAO_EC_ConsumerAdmin,TAO_EC_ProxyPushSupplier>;
+ // This two classes call the connected_i() and disconnected_i()
+ // methods, that's ok because they do while this class is holding
+ // its lock.
+
+ virtual void execute_delayed_operations (void);
+ // Dervied classes that implement delayed disconnects and connects
+ // must override this method.
+
+ SupplierSet all_suppliers_;
+ // The set of all the ProxyPushSupplier objects bound to this
+ // ConsumerAdmin.
+
+ TAO_EC_Busy_Lock_Adapter<TAO_EC_ConsumerAdmin> busy_lock_;
+ // The busy lock object
+
private:
TAO_EC_Event_Channel *event_channel_;
// The Event Channel we belong to
@@ -105,7 +155,8 @@ private:
PortableServer::POA_var default_POA_;
// Store the default POA.
- SupplierSet all_suppliers_;
+ CORBA::ULong busy_hwm_;
+ // How many threads can simultaneously iterate over the set.
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
index 061e81dba2b..da4752320ec 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
@@ -11,3 +11,21 @@ TAO_EC_ConsumerAdmin::end (void)
{
return this->all_suppliers_.end ();
}
+
+ACE_INLINE void
+TAO_EC_ConsumerAdmin::busy_hwm (CORBA::ULong hwm)
+{
+ this->busy_hwm_ = hwm;
+}
+
+ACE_INLINE CORBA::ULong
+TAO_EC_ConsumerAdmin::busy_hwm (void) const
+{
+ return this->busy_hwm_;
+}
+
+ACE_INLINE TAO_EC_ConsumerAdmin::Busy_Lock&
+TAO_EC_ConsumerAdmin::busy_lock (void)
+{
+ return this->busy_lock_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp
new file mode 100644
index 00000000000..b36f5bd1a05
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.cpp
@@ -0,0 +1,159 @@
+// $Id$
+
+#ifndef TAO_EC_CONSUMERADMIN_T_CPP
+#define TAO_EC_CONSUMERADMIN_T_CPP
+
+#include "EC_ConsumerAdmin_T.h"
+#include "EC_Command.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "EC_ConsumerAdmin_T.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, EC_ConsumerAdmin_T, "$Id$")
+
+template<ACE_SYNCH_DECL>int
+TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->busy_i ();
+}
+
+template<ACE_SYNCH_DECL> int
+TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::idle (void)
+{
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->idle_i ();
+}
+
+template<ACE_SYNCH_DECL> int
+TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy_i (void)
+{
+ if (this->busy_count_ >= this->busy_hwm ())
+ {
+ this->reached_hwm_ = 1;
+ while (this->reached_hwm_ != 0)
+ this->busy_cond_.wait ();
+ }
+ this->busy_count_++;
+ return 0;
+}
+
+template<ACE_SYNCH_DECL> int
+TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::idle_i (void)
+{
+ this->busy_count_--;
+ if (this->busy_count_ == 0)
+ {
+ this->reached_hwm_ = 0;
+ this->execute_delayed_operations ();
+ this->busy_cond_.broadcast ();
+ }
+ return 0;
+}
+
+// ****************************************************************
+
+template<ACE_SYNCH_DECL> void
+TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::connected (
+ TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
+ supplier->_incr_refcnt ();
+ this->connected_i (supplier, ACE_TRY_ENV);
+}
+
+template<ACE_SYNCH_DECL> void
+TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::disconnected (
+ TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
+ this->disconnected_i (supplier, ACE_TRY_ENV);
+}
+
+// ****************************************************************
+
+template<ACE_SYNCH_DECL> void
+TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::connected (
+ TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
+ if (this->busy_count () == 0)
+ {
+ // We can add the object immediately
+ this->connected_i (supplier, ACE_TRY_ENV);
+ }
+ else
+ {
+ supplier->_incr_refcnt ();
+ ACE_Command_Base* command;
+ ACE_NEW (command,
+ TAO_EC_ConsumerAdmin::Connected_Command (this,
+ supplier));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "EC (%P|%t) Delayed connection command = %x\n",
+ command));
+
+ this->command_queue_.enqueue_tail (command);
+ }
+}
+
+template<ACE_SYNCH_DECL> void
+TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::disconnected (
+ TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ ACE_GUARD_THROW (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
+ if (this->busy_count () == 0)
+ {
+ // We can remove the object immediately
+ this->disconnected_i (supplier, ACE_TRY_ENV);
+ }
+ else
+ {
+ ACE_Command_Base* command;
+ ACE_NEW (command,
+ TAO_EC_ConsumerAdmin::Disconnected_Command (this,
+ supplier));
+ ACE_DEBUG ((LM_DEBUG,
+ "EC (%P|%t) Delayed disconnection command = %x\n",
+ command));
+
+ this->command_queue_.enqueue_tail (command);
+ }
+}
+
+template<ACE_SYNCH_DECL> void
+TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::execute_delayed_operations (void)
+{
+ // LOCKING: the lock is taken by the idle() function
+ while (!this->command_queue_.is_empty ())
+ {
+ ACE_Command_Base* command;
+ this->command_queue_.dequeue_head (command);
+
+ command->execute ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "EC (%P|%t) Executed delayed command = %x\n",
+ command));
+
+ delete command;
+ }
+}
+
+#endif /* TAO_EC_CONSUMERADMIN_T_CPP */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h
new file mode 100644
index 00000000000..09474e7a723
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.h
@@ -0,0 +1,188 @@
+/* -*- C++ -*- */
+// $Id$
+//
+// ============================================================================
+//
+// = LIBRARY
+// ORBSVCS Real-time Event Channel
+//
+// = FILENAME
+// EC_ConsumerAdmin_T
+//
+// = AUTHOR
+// Carlos O'Ryan (coryan@cs.wustl.edu)
+//
+// = DESCRIPTION
+// Implement concrete versions of the EC_ConsumerAdmin class. This
+// concrete versions provide specific locking policies and
+// strategies to handle delayed vs. immediate connections and
+// disconnections.
+//
+// = CREDITS
+// Based on previous work by Tim Harrison (harrison@cs.wustl.edu)
+// and other members of the DOC group.
+// More details can be found in:
+// http://www.cs.wustl.edu/~schmidt/oopsla.ps.gz
+// http://www.cs.wustl.edu/~schmidt/JSAC-98.ps.gz
+//
+//
+// ============================================================================
+
+#ifndef TAO_EC_CONSUMERADMIN_T_H
+#define TAO_EC_CONSUMERADMIN_T_H
+
+#include "EC_ConsumerAdmin.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+template<ACE_SYNCH_DECL>
+class TAO_EC_ConsumerAdmin_T : public TAO_EC_ConsumerAdmin
+{
+ // = TITLE
+ // ConsumerAdmin_T
+ //
+ // = DESCRIPTION
+ // Implements the locking policies for the TAO_EC_ConsumerAdmin
+ // class, we use a parametric class to handle the co-variations
+ // between mutexes and condition variables.
+ //
+ // = MEMORY MANAGMENT
+ //
+ // = LOCKING
+ // The kind of locking is specified as the template argument.
+ // Clients still should follow the locking protocol: call busy()
+ // to use the iterator and idle() once finished, the
+ // Busy_Lock_Adapter can help there.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_ConsumerAdmin_T (TAO_EC_Event_Channel* event_channel);
+ // constructor...
+
+ // = The TAO_EC_ConsumerAdmin methods
+ virtual int busy (void);
+ virtual int idle (void);
+
+protected:
+ virtual int busy_i (void);
+ // Implements the busy() method, but without locking.
+
+ virtual int idle_i (void);
+ // Implements the busy() method, but without locking.
+
+protected:
+ int busy_count (void) const;
+ // Return the current value of busy_count [derived classes are not
+ // allowed to modify this]
+
+ ACE_SYNCH_MUTEX_T lock_;
+ // The lock
+
+ ACE_SYNCH_CONDITION_T busy_cond_;
+ // A condition variable to wait while the object is too busy.
+
+private:
+ int busy_count_;
+ // The number of threads iterating on us
+
+ int reached_hwm_;
+ // The set was too busy and reached its HWM, now everybody has to
+ // wait until we reach the LWM (0)
+};
+
+// ****************************************************************
+
+template<ACE_SYNCH_DECL>
+class TAO_EC_ConsumerAdmin_Immediate : public TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>
+{
+ // = TITLE
+ // ConsumerAdmin_Immediate
+ //
+ // = DESCRIPTION
+ // A concrete version of the EC_ConsumerAdmin class; using the
+ // locking strategy in EC_ConsumerAdmin_T and immediate execution
+ // for the connected() and disconnected() operations.
+ //
+ // = MEMORY MANAGMENT
+ //
+ // = LOCKING
+ // The kind of locking is specified as the template argument.
+ // Clients still should follow the locking protocol: call busy()
+ // to use the iterator and idle() once finished, the
+ // Busy_Lock_Adapter can help there.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_ConsumerAdmin_Immediate (TAO_EC_Event_Channel* event_channel);
+ // constructor...
+
+ // = The TAO_EC_ConsumerAdmin methods
+ virtual void connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+ virtual void disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+};
+
+// ****************************************************************
+
+template<ACE_SYNCH_DECL>
+class TAO_EC_ConsumerAdmin_Delayed : public TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>
+{
+ // = TITLE
+ // ConsumerAdmin_Delayed
+ //
+ // = DESCRIPTION
+ // A concrete version of the EC_ConsumerAdmin class; using the
+ // locking strategy in EC_ConsumerAdmin_T and storing the
+ // execution of connected() and disconnected() operations as
+ // command objects, that are executed once the set is idle.
+ //
+ // = MEMORY MANAGMENT
+ //
+ // = LOCKING
+ // The kind of locking is specified as the template argument.
+ // Clients still should follow the locking protocol: call busy()
+ // to use the iterator and idle() once finished, the
+ // Busy_Lock_Adapter can help there.
+ //
+ // = TODO
+ //
+public:
+ TAO_EC_ConsumerAdmin_Delayed (TAO_EC_Event_Channel* event_channel);
+ // constructor...
+
+ // = The TAO_EC_ConsumerAdmin methods
+ virtual void connected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+ virtual void disconnected (TAO_EC_ProxyPushSupplier*,
+ CORBA::Environment&);
+
+protected:
+ virtual void execute_delayed_operations (void);
+ // documented in TAO_EC_ConsumerAdmin
+
+private:
+ ACE_Unbounded_Queue<ACE_Command_Base*> command_queue_;
+ // The commands that carry the delayed operations are enqueued
+ // here.
+};
+
+// ****************************************************************
+
+#if defined (__ACE_INLINE__)
+#include "EC_ConsumerAdmin_T.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "EC_Busy_Lock.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("EC_Busy_Lock.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* TAO_EC_CONSUMERADMIN_T_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i
new file mode 100644
index 00000000000..4e78d7ad7e5
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin_T.i
@@ -0,0 +1,31 @@
+// $Id$
+
+template<ACE_SYNCH_DECL> ACE_INLINE
+TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::
+ TAO_EC_ConsumerAdmin_T (TAO_EC_Event_Channel *event_channel)
+ : TAO_EC_ConsumerAdmin (event_channel),
+ busy_cond_ (lock_),
+ busy_count_ (0),
+ reached_hwm_ (0)
+{
+}
+
+template<ACE_SYNCH_DECL> ACE_INLINE int
+TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE>::busy_count (void) const
+{
+ return this->busy_count_;
+}
+
+template<ACE_SYNCH_DECL> ACE_INLINE
+TAO_EC_ConsumerAdmin_Immediate<ACE_SYNCH_USE>::
+ TAO_EC_ConsumerAdmin_Immediate (TAO_EC_Event_Channel *event_channel)
+ : TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> (event_channel)
+{
+}
+
+template<ACE_SYNCH_DECL> ACE_INLINE
+TAO_EC_ConsumerAdmin_Delayed<ACE_SYNCH_USE>::
+ TAO_EC_ConsumerAdmin_Delayed (TAO_EC_Event_Channel *event_channel)
+ : TAO_EC_ConsumerAdmin_T<ACE_SYNCH_USE> (event_channel)
+{
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp
index c0c565b8b75..45944cb9bd7 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp
@@ -29,7 +29,7 @@ TAO_EC_Event_Channel::TAO_EC_Event_Channel (TAO_EC_Factory* factory)
this->factory_->create_supplier_admin (this);
this->timer_module_ =
this->factory_->create_timer_module (this);
- this->observer_strategy_ =
+ this->observer_strategy_ =
this->factory_->create_observer_strategy (this);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp
index d8512c9c955..3f3ba58be52 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Null_Factory.cpp
@@ -3,7 +3,7 @@
#include "EC_Null_Factory.h"
#include "EC_Dispatching.h"
#include "EC_Filter_Builder.h"
-#include "EC_ConsumerAdmin.h"
+#include "EC_ConsumerAdmin_T.h"
#include "EC_SupplierAdmin.h"
#include "EC_ProxyConsumer.h"
#include "EC_ProxySupplier.h"
@@ -50,7 +50,7 @@ TAO_EC_Null_Factory::destroy_filter_builder (TAO_EC_Filter_Builder *x)
TAO_EC_ConsumerAdmin*
TAO_EC_Null_Factory::create_consumer_admin (TAO_EC_Event_Channel *ec)
{
- return new TAO_EC_ConsumerAdmin (ec);
+ return new TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH> (ec);
}
void
@@ -184,3 +184,15 @@ TAO_EC_Null_Factory::destroy_supplier_admin_lock (ACE_Lock* x)
{
delete x;
}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH>;
+template class TAO_EC_ConsumerAdmin_T<ACE_NULL_SYNCH>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate TAO_EC_ConsumerAdmin_Immediate<ACE_NULL_SYNCH>
+#pragma instantiate TAO_EC_ConsumerAdmin_T<ACE_NULL_SYNCH>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
index 95e76aff5ff..b59e68f8540 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
@@ -72,6 +72,10 @@ TAO_EC_Null_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*,
// ****************************************************************
+TAO_EC_Basic_ObserverStrategy::~TAO_EC_Basic_ObserverStrategy (void)
+{
+}
+
RtecEventChannelAdmin::Observer_Handle
TAO_EC_Basic_ObserverStrategy::append_observer (
RtecEventChannelAdmin::Observer_ptr obs,
@@ -156,9 +160,9 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
{
// @@ TODO locking in the consumer admin?
- TAO_EC_ConsumerAdmin::SupplierSetIterator end =
+ TAO_EC_ConsumerAdmin::SupplierSetIterator end =
this->event_channel_->consumer_admin ()->end ();
- for (TAO_EC_ConsumerAdmin::SupplierSetIterator i =
+ for (TAO_EC_ConsumerAdmin::SupplierSetIterator i =
this->event_channel_->consumer_admin ()->begin ();
i != end;
++i)
@@ -200,15 +204,15 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
{
// @@ TODO locking in the consumer admin?
- TAO_EC_SupplierAdmin::ConsumerSetIterator end =
+ TAO_EC_SupplierAdmin::ConsumerSetIterator end =
this->event_channel_->supplier_admin ()->end ();
- for (TAO_EC_SupplierAdmin::ConsumerSetIterator i =
+ for (TAO_EC_SupplierAdmin::ConsumerSetIterator i =
this->event_channel_->supplier_admin ()->begin ();
i != end;
++i)
{
TAO_EC_ProxyPushConsumer* consumer = *i;
- const RtecEventChannelAdmin::SupplierQOS& pub =
+ const RtecEventChannelAdmin::SupplierQOS& pub =
consumer->publications ();
if (pub.is_gateway)
continue;
@@ -217,7 +221,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
const RtecEventComm::Event& event =
pub.publications[j].event;
RtecEventComm::EventType type = event.header.type;
-
+
if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED)
continue;
headers.insert (event.header, 1);
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h
index a6cde6a29a8..56640e06b47 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.h
@@ -52,7 +52,7 @@ class TAO_EC_ObserverStrategy
// The Event Channel supports Observers for the set of
// subscriptions and publications.
// This is used to implement federations of event channels,
- // either through UDP (multicast and unicast) and/or regular CORBA
+ // either through UDP (multicast and unicast) and/or regular CORBA
// calls.
//
// This behavior of the EC is strategized to avoid overhead when
@@ -101,7 +101,7 @@ class TAO_EC_Null_ObserverStrategy : public TAO_EC_ObserverStrategy
// A null observer strategy.
//
// = DESCRIPTION
- // This class keeps no information and simply ignores the messages
+ // This class keeps no information and simply ignores the messages
// from the EC.
//
public:
@@ -142,7 +142,7 @@ class TAO_EC_Basic_ObserverStrategy : public TAO_EC_ObserverStrategy
// This class simply keeps the information about the current list
// of observers, whenever the list of consumers and/or suppliers
// changes in queries the EC, computes the global subscription
- // and/or publication list and sends the update message to all the
+ // and/or publication list and sends the update message to all the
// observers.
//
public:
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i
index 6bcf535a937..6a677aa122a 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.i
@@ -33,5 +33,3 @@ TAO_EC_Basic_ObserverStrategy::Observer_Entry::
observer (o)
{
}
-
-
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index 2af0a1838b4..4b9a7367dc2 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -12,9 +12,12 @@
ACE_RCSID(Event, EC_ProxySupplier, "$Id$")
TAO_EC_ProxyPushSupplier::TAO_EC_ProxyPushSupplier (TAO_EC_Event_Channel* ec)
- : event_channel_ (ec)
+ : event_channel_ (ec),
+ refcount_ (1),
+ suspended_ (0),
+ child_ (0)
{
- this->lock_ =
+ this->lock_ =
this->event_channel_->create_supplier_lock ();
}
@@ -23,6 +26,28 @@ TAO_EC_ProxyPushSupplier::~TAO_EC_ProxyPushSupplier (void)
this->event_channel_->destroy_supplier_lock (this->lock_);
}
+CORBA::ULong
+TAO_EC_ProxyPushSupplier::_incr_refcnt (void)
+{
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ return this->refcount_++;
+}
+
+CORBA::ULong
+TAO_EC_ProxyPushSupplier::_decr_refcnt (void)
+{
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return this->refcount_;
+ }
+
+ // Notify the event channel
+ this->event_channel_->destroy_proxy_push_supplier (this);
+ return 0;
+}
+
void
TAO_EC_ProxyPushSupplier::set_default_POA (PortableServer::POA_ptr poa)
{
@@ -74,20 +99,22 @@ TAO_EC_ProxyPushSupplier::connect_push_consumer (
const RtecEventChannelAdmin::ConsumerQOS& qos,
CORBA::Environment &ACE_TRY_ENV)
{
- ACE_GUARD_THROW (
- ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ {
+ ACE_GUARD_THROW (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- if (this->is_connected_i ())
- ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
+ if (this->is_connected_i ())
+ ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ());
- this->consumer_ =
- RtecEventComm::PushConsumer::_duplicate (push_consumer);
- this->qos_ = qos;
+ this->consumer_ =
+ RtecEventComm::PushConsumer::_duplicate (push_consumer);
+ this->qos_ = qos;
- this->child_ =
- this->event_channel_->filter_builder ()->build (this->qos_);
- this->adopt_child (this->child_);
+ this->child_ =
+ this->event_channel_->filter_builder ()->build (this->qos_);
+ this->adopt_child (this->child_);
+ }
// Notify the event channel...
this->event_channel_->connected (this, ACE_TRY_ENV);
@@ -101,9 +128,9 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier (
ACE_GUARD_THROW (
ACE_Lock, ace_mon, *this->lock_,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
-
+
this->consumer_ =
- RtecEventComm::PushConsumer::_nil ();
+ RtecEventComm::PushConsumer::_nil ();
PortableServer::POA_var poa =
this->_default_POA_i ();
@@ -114,7 +141,7 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier (
poa->deactivate_object (id.in (), ACE_TRY_ENV);
ACE_CHECK;
}
- this->event_channel_->destroy_proxy_push_supplier (this);
+ this->_decr_refcnt ();
}
void
@@ -167,7 +194,7 @@ TAO_EC_ProxyPushSupplier::push (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // Do not take a lock, this is a call back from our child filter, so
+ // Do not take a lock, this is a call back from our child filter, so
// we are holding the lock already (in the filter() method).
this->event_channel_->dispatching ()->push (this,
this->consumer_,
@@ -182,7 +209,7 @@ TAO_EC_ProxyPushSupplier::push_nocopy (RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
CORBA::Environment& ACE_TRY_ENV)
{
- // Do not take a lock, this is a call back from our child filter, so
+ // Do not take a lock, this is a call back from our child filter, so
// we are holding the lock already (in the filter() method).
this->event_channel_->dispatching ()->push_nocopy (this,
this->consumer_,
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
index 34051dfcc7a..8ce01cc3585 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.h
@@ -51,7 +51,6 @@ class TAO_EC_ProxyPushSupplier : public POA_RtecEventChannelAdmin::ProxyPushSupp
// used to communicate with a particular consumer.
//
// = MEMORY MANAGMENT
- // This object will be reference counted.
// It does not assume ownership of the TAO_EC_Dispatching object.
// It makes a copy of the ConsumerQOS and the consumer object
// reference.
@@ -116,6 +115,14 @@ public:
virtual void suspend_connection (CORBA::Environment &);
virtual void resume_connection (CORBA::Environment &);
+ virtual CORBA::ULong _incr_refcnt (void);
+ virtual CORBA::ULong _decr_refcnt (void);
+ // Increment and decrement the reference count.
+ // @@ TODO We use the canonical tao form, but in the future we may
+ // want to add methods that follow the upcoming CORBA2.3
+ // specification, which will include reference counting for
+ // servants.
+
// = The TAO_EC_Filter methods, only push() is implemented...
virtual int filter (const RtecEventComm::EventSet& event,
TAO_EC_QOS_Info& qos_info,
@@ -147,6 +154,9 @@ private:
ACE_Lock* lock_;
// The locking strategy.
+ CORBA::ULong refcount_;
+ // The reference count.
+
RtecEventComm::PushConsumer_var consumer_;
// The consumer....
@@ -159,8 +169,8 @@ private:
PortableServer::POA_var default_POA_;
// Store the default POA.
-private:
TAO_EC_Filter* child_;
+ // The filter object
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp
index 06d94ed726c..c7e7e9905a7 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.cpp
@@ -50,8 +50,13 @@ void
TAO_EC_Null_SupplierFiltering::push (const RtecEventComm::EventSet& event,
CORBA::Environment &ACE_TRY_ENV)
{
- TAO_EC_ConsumerAdmin* consumer_admin =
+ TAO_EC_ConsumerAdmin* consumer_admin =
this->event_channel_->consumer_admin ();
+
+ ACE_GUARD_THROW (TAO_EC_ConsumerAdmin::Busy_Lock,
+ ace_mon, consumer_admin->busy_lock (),
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+
TAO_EC_ConsumerAdmin::SupplierSetIterator end =
consumer_admin->end ();
@@ -65,5 +70,3 @@ TAO_EC_Null_SupplierFiltering::push (const RtecEventComm::EventSet& event,
ACE_CHECK;
}
}
-
-
diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile
index 39d8bea66ec..9bb0777e3f8 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile
+++ b/TAO/orbsvcs/orbsvcs/Makefile
@@ -165,6 +165,7 @@ ifneq (,$(findstring Event2,$(TAO_ORBSVCS)))
Event/EC_Type_Filter \
Event/EC_Basic_Filter_Builder \
Event/EC_Basic_Factory \
+ Event/EC_ObserverStrategy \
endif # Event2
diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
index 6e6e4282171..b31df29deb6 100644
--- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
+++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl
@@ -84,8 +84,8 @@ module RtecEventComm {
//
// = DESCRIPTION
// Events are represented by this structure, it is simply a
- // header,data pair.
- //
+ // header,data pair.
+ //
EventHeader header;
EventData data;
@@ -95,7 +95,7 @@ module RtecEventComm {
interface PushConsumer {
oneway void push (in EventSet data);
- void disconnect_push_consumer();
+ void disconnect_push_consumer();
};
interface PushSupplier {
@@ -103,5 +103,3 @@ module RtecEventComm {
};
};
-
-
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
index dfac0802a25..946d4cde39a 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp
@@ -26,7 +26,8 @@ Test_Consumer::Test_Consumer (ECT_Driver *driver,
void
Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
const char* name,
- int event_a, int event_b,
+ int type_start,
+ int type_count,
RtecEventChannelAdmin::EventChannel_ptr ec,
CORBA::Environment& TAO_IN_ENV)
{
@@ -53,8 +54,10 @@ Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
ACE_ConsumerQOS_Factory qos;
qos.start_disjunction_group ();
qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
- qos.insert_type (event_a, rt_info);
- qos.insert_type (event_b, rt_info);
+ for (int i = 0; i != type_count; ++i)
+ {
+ qos.insert_type (type_start + i, rt_info);
+ }
// = Connect as a consumer.
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
@@ -87,7 +90,7 @@ Test_Consumer::disconnect (CORBA::Environment &TAO_IN_ENV)
RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
// Deactivate the servant
- PortableServer::POA_var poa =
+ PortableServer::POA_var poa =
this->_default_POA (TAO_IN_ENV);
TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
PortableServer::ObjectId_var id =
@@ -104,7 +107,7 @@ Test_Consumer::dump_results (const char* name)
this->timer_.elapsed_time (tv);
double f = 1.0 / (tv.sec () + tv.usec () / 1000000.0);
double eps = this->recv_count_ * f;
-
+
ACE_DEBUG ((LM_DEBUG,
"ECT_Consumer (%s):\n"
" Total time: %d.%08.8d (secs.usecs)\n"
@@ -148,11 +151,6 @@ Test_Consumer::push (const RtecEventComm::EventSet& events,
{
const RtecEventComm::Event& e = events[i];
- if (e.data.payload.mb () == 0)
- {
- // ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i));
- // continue;
- }
if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
{
this->shutdown_count_++;
@@ -169,7 +167,7 @@ Test_Consumer::push (const RtecEventComm::EventSet& events,
ACE_hrtime_t creation;
ORBSVCS_Time::TimeT_to_hrtime (creation,
e.header.creation_time);
-
+
ACE_hrtime_t ec_recv;
ORBSVCS_Time::TimeT_to_hrtime (ec_recv,
e.header.ec_recv_time);
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h
index c59cee0a4dd..8b33409dd5b 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.h
@@ -26,11 +26,15 @@
class Test_Consumer : public POA_RtecEventComm::PushConsumer
{
- //
// = TITLE
// Receive the events.
//
// = DESCRIPTION
+ // This class is a consumer of events. It subscribes for a
+ // continous ranges of event types, this permits studying the
+ // effect of the number of subscriptions for each particular kind
+ // of event on the EC.
+ //
public:
Test_Consumer (ECT_Driver* driver,
void* cookie,
@@ -38,8 +42,8 @@ public:
void connect (RtecScheduler::Scheduler_ptr scheduler,
const char* name,
- int event_a,
- int event_b,
+ int type_start,
+ int type_count,
RtecEventChannelAdmin::EventChannel_ptr ec,
CORBA::Environment& _env);
// This method connects the consumer to the EC.
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp
index b8e68020b14..a930d680bf2 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.cpp
@@ -25,8 +25,8 @@ main (int argc, char *argv [])
ECT_Consumer_Driver::ECT_Consumer_Driver (void)
: n_consumers_ (1),
n_suppliers_ (1),
- event_a_ (ACE_ES_EVENT_UNDEFINED),
- event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
+ type_start_ (ACE_ES_EVENT_UNDEFINED),
+ type_count_ (1),
pid_file_name_ (0),
active_count_ (0)
{
@@ -67,14 +67,14 @@ ECT_Consumer_Driver::run (int argc, char* argv[])
"Execution parameters:\n"
" consumers = <%d>\n"
" suppliers = <%d>\n"
- " supplier Event A = <%d>\n"
- " supplier Event B = <%d>\n"
+ " type_start = <%d>\n"
+ " type count = <%d>\n"
" pid file name = <%s>\n",
this->n_consumers_,
this->n_suppliers_,
- this->event_a_,
- this->event_b_,
+ this->type_start_,
+ this->type_start_,
this->pid_file_name_?this->pid_file_name_:"nil") );
@@ -131,7 +131,7 @@ ECT_Consumer_Driver::run (int argc, char* argv[])
TAO_CHECK_ENV;
if (CORBA::is_nil (sched_obj.in ()))
return 1;
- RtecScheduler::Scheduler_var scheduler =
+ RtecScheduler::Scheduler_var scheduler =
RtecScheduler::Scheduler::_narrow (sched_obj.in (),
TAO_TRY_ENV);
TAO_CHECK_ENV;
@@ -223,8 +223,8 @@ ECT_Consumer_Driver::connect_consumers
this->consumers_[i]->connect (scheduler,
buf,
- this->event_a_,
- this->event_b_,
+ this->type_start_,
+ this->type_count_,
channel,
TAO_IN_ENV);
TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
@@ -277,9 +277,9 @@ ECT_Consumer_Driver::parse_args (int argc, char *argv [])
char* aux;
char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux);
- this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ this->type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
- this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ this->type_count_ = ACE_OS::atoi (arg);
}
break;
@@ -294,7 +294,7 @@ ECT_Consumer_Driver::parse_args (int argc, char *argv [])
"[ORB options] "
"-c <n_consumers> "
"-s <n_suppliers> "
- "-h <event_a,event_b> "
+ "-h <type_start,type_count> "
"-p <pid file name> "
"\n",
argv[0]));
@@ -319,5 +319,14 @@ ECT_Consumer_Driver::parse_args (int argc, char *argv [])
"suppliers out of range\n", argv[0]), -1);
}
+ if (this->type_count_ <= 0)
+ {
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "%s: number of event types "
+ "suppliers out of range, reset to default (1)\n",
+ argv[0]), -1);
+ this->type_count_ = 1;
+ }
+
return 0;
}
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h
index 36827c86218..4eaf51054c6 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer_Driver.h
@@ -74,9 +74,10 @@ private:
// supplier sends a shutdown message after it finishes, the consumer
// finishes when all the suppliers do.
- int event_a_;
- int event_b_;
- // We send two types of events, with different contents.
+ int type_start_;
+ int type_count_;
+ // We receive the events whose type is in the range
+ // [type_start,type_start+type_count)
const char* pid_file_name_;
// The name of a file where the process stores its pid
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
index 09c2097252f..ea94d0c59e2 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp
@@ -18,7 +18,9 @@ Test_Supplier::Test_Supplier (ECT_Driver *driver)
burst_count_ (0),
burst_size_ (0),
event_size_ (0),
- burst_pause_ (0)
+ burst_pause_ (0),
+ type_start_ (ACE_ES_EVENT_UNDEFINED),
+ type_count_ (1)
{
}
@@ -29,8 +31,8 @@ Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler,
int burst_size,
int event_size,
int burst_pause,
- int event_a,
- int event_b,
+ int type_start,
+ int type_count,
RtecEventChannelAdmin::EventChannel_ptr ec,
CORBA::Environment &TAO_IN_ENV)
{
@@ -38,9 +40,9 @@ Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler,
this->burst_size_ = burst_size;
this->event_size_ = event_size;
this->burst_pause_ = burst_pause;
- this->event_a_ = event_a;
- this->event_b_ = event_b;
-
+ this->type_start_ = type_start;
+ this->type_count_ = type_count;
+
RtecScheduler::handle_t rt_info =
scheduler->create (name, TAO_IN_ENV);
TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
@@ -71,12 +73,12 @@ Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler,
this->supplier_id_));
ACE_SupplierQOS_Factory qos;
- qos.insert (this->supplier_id_,
- event_a,
- rt_info, 1);
- qos.insert (this->supplier_id_,
- event_b,
- rt_info, 1);
+ for (int i = 0; i != type_count; ++i)
+ {
+ qos.insert (this->supplier_id_,
+ type_start + i,
+ rt_info, 1);
+ }
qos.insert (this->supplier_id_,
ACE_ES_EVENT_SHUTDOWN,
rt_info, 1);
@@ -112,7 +114,7 @@ Test_Supplier::disconnect (CORBA::Environment &TAO_IN_ENV)
RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
// Deactivate the servant
- PortableServer::POA_var poa =
+ PortableServer::POA_var poa =
this->supplier_._default_POA (TAO_IN_ENV);
TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
PortableServer::ObjectId_var id =
@@ -166,10 +168,8 @@ Test_Supplier::svc ()
{
for (int j = 0; j < this->burst_size_; ++j)
{
- if (j % 2 == 0)
- event[0].header.type = this->event_a_;
- else
- event[0].header.type = this->event_b_;
+ event[0].header.type =
+ this->type_start_ + j % this->type_count_;
ACE_hrtime_t now = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
@@ -187,7 +187,7 @@ Test_Supplier::svc ()
this->consumer_proxy ()->push(event, TAO_TRY_ENV);
TAO_CHECK_ENV;
this->timer_.stop ();
-
+
}
TAO_CATCH (CORBA::SystemException, sys_ex)
{
@@ -226,7 +226,7 @@ Test_Supplier::dump_results (const char* name)
int event_count = this->burst_count_ * this->burst_size_ + 1;
double f = 1.0 / (tv.sec () + tv.usec () / 1000000.0);
double eps = event_count * f;
-
+
ACE_DEBUG ((LM_DEBUG,
"ECT_Supplier (%s):\n"
" Total time: %d.%08.8d (secs.usecs)\n"
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h
index a694b13fb2e..233e270f3d7 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.h
@@ -4,9 +4,8 @@
// ============================================================================
//
// = DESCRIPTION
-// This test to measure how many events per minute can the EC
-// process, it also serves as an example how how to encode complex
-// data types in a octet sequence.
+// This is a helper class for the throughput tests of the Event
+// Channel.
//
// ============================================================================
@@ -43,8 +42,8 @@ public:
int burst_size,
int event_size,
int burst_pause,
- int event_a,
- int event_b,
+ int type_start,
+ int type_count,
RtecEventChannelAdmin::EventChannel_ptr ec,
CORBA::Environment& _env);
// This method connects the supplier to the EC.
@@ -89,8 +88,8 @@ private:
int burst_size_;
int event_size_;
int burst_pause_;
- int event_a_;
- int event_b_;
+ int type_start_;
+ int type_count_;
// The test data.
};
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp
index 7444b067e0b..8423b26bc26 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.cpp
@@ -28,8 +28,8 @@ ECT_Supplier_Driver::ECT_Supplier_Driver (void)
burst_size_ (100),
event_size_ (128),
burst_pause_ (100),
- event_a_ (ACE_ES_EVENT_UNDEFINED),
- event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
+ type_start_ (ACE_ES_EVENT_UNDEFINED),
+ type_count_ (1),
pid_file_name_ (0)
{
}
@@ -78,8 +78,8 @@ ECT_Supplier_Driver::run (int argc, char* argv[])
" burst size = <%d>\n"
" event size = <%d>\n"
" burst size = <%d>\n"
- " supplier Event A = <%d>\n"
- " supplier Event B = <%d>\n"
+ " type start = <%d>\n"
+ " type count = <%d>\n"
" pid file name = <%s>\n",
this->n_suppliers_,
@@ -87,8 +87,8 @@ ECT_Supplier_Driver::run (int argc, char* argv[])
this->burst_size_,
this->event_size_,
this->burst_pause_,
- this->event_a_,
- this->event_b_,
+ this->type_start_,
+ this->type_count_,
this->pid_file_name_?this->pid_file_name_:"nil") );
@@ -145,7 +145,7 @@ ECT_Supplier_Driver::run (int argc, char* argv[])
TAO_CHECK_ENV;
if (CORBA::is_nil (sched_obj.in ()))
return 1;
- RtecScheduler::Scheduler_var scheduler =
+ RtecScheduler::Scheduler_var scheduler =
RtecScheduler::Scheduler::_narrow (sched_obj.in (),
TAO_TRY_ENV);
TAO_CHECK_ENV;
@@ -168,7 +168,7 @@ ECT_Supplier_Driver::run (int argc, char* argv[])
poa_manager->activate (TAO_TRY_ENV);
TAO_CHECK_ENV;
- this->connect_suppliers (scheduler.in (),
+ this->connect_suppliers (scheduler.in (),
channel.in (),
TAO_TRY_ENV);
TAO_CHECK_ENV;
@@ -227,8 +227,8 @@ ECT_Supplier_Driver::connect_suppliers
this->burst_size_,
this->event_size_,
this->burst_pause_,
- this->event_a_,
- this->event_b_,
+ this->type_start_,
+ this->type_count_,
channel,
TAO_IN_ENV);
TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV);
@@ -301,9 +301,9 @@ ECT_Supplier_Driver::parse_args (int argc, char *argv [])
char* aux;
char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux);
- this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ this->type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
- this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ this->type_count_ = ACE_OS::atoi (arg);
}
break;
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h
index 7db63600626..a4294e0e929 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier_Driver.h
@@ -4,9 +4,8 @@
// ============================================================================
//
// = DESCRIPTION
-// This test to measure how many events per minute can the EC
-// process, it also serves as an example how how to encode complex
-// data types in a octet sequence.
+// This is a helper class for the throughput tests of the Event
+// Channel.
//
// ============================================================================
@@ -82,8 +81,8 @@ private:
int burst_pause_;
// The time between each event burst, in microseconds.
- int event_a_;
- int event_b_;
+ int type_start_;
+ int type_count_;
// We send two types of events, with different contents.
const char* pid_file_name_;
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp
index 9fac6231604..ed0bbf768c2 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.cpp
@@ -15,6 +15,7 @@
#include "orbsvcs/Event/Module_Factory.h"
#include "orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/Event/EC_Basic_Factory.h"
+#include "orbsvcs/Event/EC_ConsumerAdmin.h"
#include "ECT_Throughput.h"
ACE_RCSID(EC_Throughput, ECT_Throughput, "$Id$")
@@ -35,12 +36,17 @@ ECT_Throughput::ECT_Throughput (void)
burst_size_ (100),
event_size_ (128),
burst_pause_ (100),
- event_a_ (ACE_ES_EVENT_UNDEFINED),
- event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
+ consumer_type_start_ (ACE_ES_EVENT_UNDEFINED),
+ consumer_type_count_ (1),
+ consumer_type_shift_ (0),
+ supplier_type_start_ (ACE_ES_EVENT_UNDEFINED),
+ supplier_type_count_ (1),
+ supplier_type_shift_ (0),
pid_file_name_ (0),
active_count_ (0),
reactive_ec_ (0),
- new_ec_ (0)
+ new_ec_ (0),
+ ec_concurrency_hwm_ (1)
{
}
@@ -83,11 +89,16 @@ ECT_Throughput::run (int argc, char* argv[])
" burst size = <%d>\n"
" event size = <%d>\n"
" burst size = <%d>\n"
- " supplier Event A = <%d>\n"
- " supplier Event B = <%d>\n"
+ " consumer type start = <%d>\n"
+ " consumer type count = <%d>\n"
+ " consumer type shift = <%d>\n"
+ " supplier type start = <%d>\n"
+ " supplier type count = <%d>\n"
+ " supplier type shift = <%d>\n"
" pid file name = <%s>\n"
" remote EC = <%d>\n"
- " new EC = <%d>\n",
+ " new EC = <%d>\n"
+ " concurrency HWM = <%d>\n",
this->n_consumers_,
this->n_suppliers_,
@@ -95,12 +106,17 @@ ECT_Throughput::run (int argc, char* argv[])
this->burst_size_,
this->event_size_,
this->burst_pause_,
- this->event_a_,
- this->event_b_,
+ this->consumer_type_start_,
+ this->consumer_type_count_,
+ this->consumer_type_shift_,
+ this->supplier_type_start_,
+ this->supplier_type_count_,
+ this->supplier_type_shift_,
this->pid_file_name_?this->pid_file_name_:"nil",
this->reactive_ec_,
- this->new_ec_
+ this->new_ec_,
+ this->ec_concurrency_hwm_
) );
if (this->pid_file_name_ != 0)
@@ -175,7 +191,7 @@ ECT_Throughput::run (int argc, char* argv[])
str.in ()));
// Register the servant with the Naming Context....
- naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV);
+ naming_context->rebind (schedule_name, scheduler.in (), TAO_TRY_ENV);
TAO_CHECK_ENV;
ACE_Scheduler_Factory::use_config (naming_context.in ());
@@ -219,9 +235,11 @@ ECT_Throughput::run (int argc, char* argv[])
new TAO_EC_Event_Channel (ec_factory.get ());
ec->activate (TAO_TRY_ENV);
TAO_CHECK_ENV;
+ ec->consumer_admin ()->busy_hwm (this->ec_concurrency_hwm_);
ec_impl =
auto_ptr<POA_RtecEventChannelAdmin::EventChannel> (ec);
+
#else
ACE_ERROR_RETURN ((LM_ERROR,
"The new event channel is not supported "
@@ -321,7 +339,7 @@ ECT_Throughput::run (int argc, char* argv[])
void
ECT_Throughput::shutdown_consumer (void*,
- CORBA::Environment &)
+ CORBA::Environment &)
{
// int ID =
// (ACE_reinterpret_cast(Test_Consumer**,consumer_cookie)
@@ -355,10 +373,13 @@ ECT_Throughput::connect_consumers
this->consumers_ + i,
this->n_suppliers_));
+ int start = this->consumer_type_start_
+ + i * this->consumer_type_shift_;
+
this->consumers_[i]->connect (scheduler,
buf,
- this->event_a_,
- this->event_b_,
+ start,
+ this->consumer_type_count_,
channel,
TAO_IN_ENV);
if (TAO_IN_ENV.exception () != 0) return;
@@ -378,14 +399,15 @@ ECT_Throughput::connect_suppliers
ACE_NEW (this->suppliers_[i], Test_Supplier (this));
+ int start = this->supplier_type_start_ + i*this->supplier_type_shift_;
this->suppliers_[i]->connect (scheduler,
buf,
this->burst_count_,
this->burst_size_,
this->event_size_,
this->burst_pause_,
- this->event_a_,
- this->event_b_,
+ start,
+ this->supplier_type_count_,
channel,
TAO_IN_ENV);
if (TAO_IN_ENV.exception () != 0) return;
@@ -444,7 +466,7 @@ ECT_Throughput::dump_results (void)
int
ECT_Throughput::parse_args (int argc, char *argv [])
{
- ACE_Get_Opt get_opt (argc, argv, "dc:s:u:n:t:b:h:p:m:r");
+ ACE_Get_Opt get_opt (argc, argv, "rdc:s:u:n:t:b:h:l:p:m:w:");
int opt;
while ((opt = get_opt ()) != EOF)
@@ -512,9 +534,24 @@ ECT_Throughput::parse_args (int argc, char *argv [])
char* aux;
char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux);
- this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ this->consumer_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ arg = ACE_OS::strtok_r (0, ",", &aux);
+ this->consumer_type_count_ = ACE_OS::atoi (arg);
+ arg = ACE_OS::strtok_r (0, ",", &aux);
+ this->consumer_type_shift_ = ACE_OS::atoi (arg);
+ }
+ break;
+
+ case 'l':
+ {
+ char* aux;
+ char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux);
+
+ this->supplier_type_start_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ arg = ACE_OS::strtok_r (0, ",", &aux);
+ this->supplier_type_count_ = ACE_OS::atoi (arg);
arg = ACE_OS::strtok_r (0, ",", &aux);
- this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
+ this->supplier_type_shift_ = ACE_OS::atoi (arg);
}
break;
@@ -522,6 +559,10 @@ ECT_Throughput::parse_args (int argc, char *argv [])
this->pid_file_name_ = get_opt.optarg;
break;
+ case 'w':
+ this->ec_concurrency_hwm_ = ACE_OS::atoi (get_opt.optarg);
+ break;
+
case '?':
default:
ACE_DEBUG ((LM_DEBUG,
@@ -533,8 +574,10 @@ ECT_Throughput::parse_args (int argc, char *argv [])
"-n <burst size> "
"-b <event payload size> "
"-t <burst pause (usecs)> "
- "-h <eventa,eventb> "
+ "-h <consumer_start,consumer_count,consumer_shift> "
+ "-l <supplier_start,supplier_count,supplier_shift> "
"-p <pid file name> "
+ "-w <concurrency HWM> "
"-r "
"\n",
argv[0]));
@@ -603,6 +646,15 @@ ECT_Throughput::parse_args (int argc, char *argv [])
argv[0], 1), -1);
}
+ if (this->ec_concurrency_hwm_ <= 0)
+ {
+ ACE_ERROR_RETURN ((LM_DEBUG,
+ "%s: invalid concurrency HWM, "
+ "reset to default (%d)\n",
+ argv[0], 1), -1);
+ this->ec_concurrency_hwm_ = 1;
+ }
+
return 0;
}
diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h
index a37826e2543..47d1ff7dacd 100644
--- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h
+++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Throughput.h
@@ -4,6 +4,9 @@
// ============================================================================
//
// = DESCRIPTION
+// This is a helper class for the throughput tests of the Event
+// Channel.
+// Used for the collocated test.
//
// ============================================================================
@@ -93,9 +96,23 @@ private:
int burst_pause_;
// The time between each event burst, in microseconds.
- int event_a_;
- int event_b_;
- // We send two types of events, with different contents.
+ int consumer_type_start_;
+ int consumer_type_count_;
+ int consumer_type_shift_;
+ // The consumers subscribe to different sets of events, as follows:
+ // Consumer0: [start , start + count)
+ // Consumer1: [start + 1*shift, start + 1*shift + count)
+ // Consumer2: [start + 2*shift, start + 2*shift + count)
+ // And so on.
+
+ int supplier_type_start_;
+ int supplier_type_count_;
+ int supplier_type_shift_;
+ // The suppliers generate different sets of events, as follows:
+ // Supplier0: [start , start + count)
+ // Supplier1: [start + 1*shift, start + 1*shift + count)
+ // Supplier2: [start + 2*shift, start + 2*shift + count)
+ // And so on.
const char* pid_file_name_;
// The name of a file where the process stores its pid
@@ -112,6 +129,9 @@ private:
int new_ec_;
// If not zero then we use the new EC implementation
+
+ int ec_concurrency_hwm_;
+ // Set the HWM for the concurrency in the EC.
};
#endif /* ECT_TRHOUGHPUT_H */