summaryrefslogtreecommitdiff
path: root/TAO
diff options
context:
space:
mode:
Diffstat (limited to 'TAO')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp44
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp464
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h128
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp477
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h157
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i14
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile.RTEvent3
-rw-r--r--TAO/orbsvcs/orbsvcs/RTEvent.bor1
-rw-r--r--TAO/orbsvcs/orbsvcs/RTEvent.dsp8
-rw-r--r--TAO/orbsvcs/orbsvcs/RTEvent.mpc3
-rw-r--r--TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp8
-rw-r--r--TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp2
-rw-r--r--TAO/orbsvcs/tests/Bug_1393_Regression/Makefile4
-rw-r--r--TAO/orbsvcs/tests/Bug_1395_Regression/Makefile5
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Gateway.cpp2
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Observer.h2
-rw-r--r--TAO/orbsvcs/tests/Notify/Basic/Makefile.bor2
19 files changed, 690 insertions, 645 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp
index a0696b23d7d..cbddb37f659 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp
@@ -46,28 +46,36 @@ TAO_EC_Event_Channel_Base (const TAO_EC_Event_Channel_Attributes& attr,
TAO_EC_Event_Channel_Base::~TAO_EC_Event_Channel_Base (void)
{
- this->factory_->destroy_dispatching (this->dispatching_);
- this->dispatching_ = 0;
- this->factory_->destroy_filter_builder (this->filter_builder_);
- this->filter_builder_ = 0;
- this->factory_->destroy_supplier_filter_builder (this->supplier_filter_builder_);
- this->supplier_filter_builder_ = 0;
- this->factory_->destroy_consumer_admin (this->consumer_admin_);
- this->consumer_admin_ = 0;
- this->factory_->destroy_supplier_admin (this->supplier_admin_);
- this->supplier_admin_ = 0;
- this->factory_->destroy_timeout_generator (this->timeout_generator_);
- this->timeout_generator_ = 0;
- this->factory_->destroy_observer_strategy (this->observer_strategy_);
- this->observer_strategy_ = 0;
+ // Destroy Strategies in the reverse order of creation, they
+ // refere to each other during destruction and thus need to be
+ // cleaned up properly.
+ this->factory_->destroy_supplier_control (this->supplier_control_);
+ this->supplier_control_ = 0;
+ this->factory_->destroy_consumer_control (this->consumer_control_);
+ this->consumer_control_ = 0;
this->factory_->destroy_scheduling_strategy (this->scheduling_strategy_);
this->scheduling_strategy_ = 0;
- this->factory_->destroy_consumer_control (this->consumer_control_);
- this->consumer_control_ = 0;
- this->factory_->destroy_supplier_control (this->supplier_control_);
- this->supplier_control_ = 0;
+ this->factory_->destroy_observer_strategy (this->observer_strategy_);
+ this->observer_strategy_ = 0;
+
+ this->factory_->destroy_timeout_generator (this->timeout_generator_);
+ this->timeout_generator_ = 0;
+
+ this->factory_->destroy_supplier_admin (this->supplier_admin_);
+ this->supplier_admin_ = 0;
+ this->factory_->destroy_consumer_admin (this->consumer_admin_);
+ this->consumer_admin_ = 0;
+
+ this->factory_->destroy_supplier_filter_builder (this->supplier_filter_builder_);
+ this->supplier_filter_builder_ = 0;
+
+ this->factory_->destroy_filter_builder (this->filter_builder_);
+ this->filter_builder_ = 0;
+
+ this->factory_->destroy_dispatching (this->dispatching_);
+ this->dispatching_ = 0;
this->factory (0, 0);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
index ef141efbfb1..0a7b44ea438 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
@@ -1,35 +1,9 @@
// $Id$
#include "orbsvcs/Event/EC_Gateway.h"
-#include "orbsvcs/Event_Utilities.h"
-#include "orbsvcs/Time_Utilities.h"
ACE_RCSID(Event, EC_Gateway, "$Id$")
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-template class ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>;
-template class ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>;
-template class ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>;
-template class ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
-template class ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
-template class ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
-template class ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
-
-#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-
-#pragma instantiate ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>
-#pragma instantiate ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>
-
-#pragma instantiate ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>
-#pragma instantiate ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
-#pragma instantiate ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
-#pragma instantiate ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
-#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
-
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
-
TAO_EC_Gateway::TAO_EC_Gateway (void)
: handle_ (0)
{
@@ -51,441 +25,3 @@ TAO_EC_Gateway::observer_handle (void) const
return this->handle_;
}
-TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
- : busy_count_ (0),
- update_posted_ (0),
- rmt_info_ (0),
- lcl_info_ (0),
- consumer_ (this),
- consumer_is_active_ (0),
- supplier_ (this),
- supplier_is_active_ (0)
-{
-}
-
-TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
-{
-}
-
-void
-TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
- RtecEventChannelAdmin::EventChannel_ptr lcl_ec
- ACE_ENV_ARG_DECL)
-{
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->init_i (rmt_ec, lcl_ec ACE_ENV_ARG_PARAMETER);
-}
-
-void
-TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
- RtecEventChannelAdmin::EventChannel_ptr lcl_ec
- ACE_ENV_ARG_DECL_NOT_USED)
-{
- if (!CORBA::is_nil (this->rmt_ec_.in ()))
- return;
-
- this->rmt_ec_ =
- RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec);
- this->lcl_ec_ =
- RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
-}
-
-void
-TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL)
-{
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
-}
-
-
-void
-TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL)
-{
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
-
- if (this->consumer_proxy_map_.current_size () > 0)
- {
- for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
- j != this->consumer_proxy_map_.end ();
- ++j)
- {
- RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
- if (CORBA::is_nil (consumer))
- continue;
- ACE_TRY
- {
- consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
- }
- ACE_CATCHANY
- {
- }
- ACE_ENDTRY;
- CORBA::release (consumer);
- }
- // Remove all the elements on the map. Calling close() does not
- // work because the map is left in an inconsistent state.
- this->consumer_proxy_map_.open ();
- }
-
- if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
- {
- this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->default_consumer_proxy_ =
- RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
- }
-
- if (!CORBA::is_nil (this->supplier_proxy_.in ()))
- {
- this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->supplier_proxy_ =
- RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
- }
-}
-
-void
-TAO_EC_Gateway_IIOP::update_consumer (
- const RtecEventChannelAdmin::ConsumerQOS& c_qos
- ACE_ENV_ARG_DECL)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- if (c_qos.dependencies.length () <= 1)
- return;
-
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
-
- if (this->busy_count_ != 0)
- {
- this->update_posted_ = 1;
- this->c_qos_ = c_qos;
- return;
- }
-
- this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER);
-}
-
-void
-TAO_EC_Gateway_IIOP::update_consumer_i (
- const RtecEventChannelAdmin::ConsumerQOS& c_qos
- ACE_ENV_ARG_DECL)
-{
- this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- if (CORBA::is_nil (this->lcl_ec_.in ())
- || CORBA::is_nil (this->rmt_ec_.in ()))
- return;
-
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
-
- // = Connect as a supplier to the local EC
- RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
- this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- // Change the RT_Info in the consumer QoS.
- // On the same loop we discover the subscriptions by event source,
- // and fill the consumer proxy map.
- RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
- sub.is_gateway = 1;
- for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
- {
- sub.dependencies[i].rt_info = this->rmt_info_;
-
- RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
- const RtecEventComm::EventHeader &h =
- sub.dependencies[i].event.header;
-
- RtecEventComm::EventSourceID sid = h.source;
-
- //ACE_DEBUG ((LM_DEBUG,
- // "ECG (%t) trying (%d,%d)\n",
- // sid, h.type));
-
- // Skip all subscriptions that do not require an specific source
- // id.
- if (sid == 0)
- continue;
-
- // Skip all the magic event types.
- if (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
- continue;
-
- if (this->consumer_proxy_map_.find (sid, proxy) == -1)
- {
- //ACE_DEBUG ((LM_DEBUG,
- // "ECG (%t) binding source %d\n",
- // sid));
- proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
- this->consumer_proxy_map_.bind (sid, proxy);
- }
- }
- //ACE_DEBUG ((LM_DEBUG,
- // "ECG (%t) consumer map computed (%d entries)\n",
- // this->consumer_proxy_map_.current_size ()));
-
- if (this->consumer_proxy_map_.current_size () > 0)
- {
- this->supplier_is_active_ = 1;
-
- // Obtain a reference to our supplier personality...
- RtecEventComm::PushSupplier_var supplier_ref =
- this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- // For each subscription by source build the set of publications
- // (they may several, by type, for instance) and connect to the
- // consumer proxy.
- for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
- j != this->consumer_proxy_map_.end ();
- ++j)
- {
- RtecEventChannelAdmin::SupplierQOS pub;
- pub.publications.length (sub.dependencies.length () + 1);
- pub.is_gateway = 1;
-
- int c = 0;
-
- RtecEventComm::EventSourceID sid = (*j).ext_id_;
- for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
- {
- const RtecEventComm::EventHeader& h =
- sub.dependencies[k].event.header;
- if (h.source != sid
- || (0 < h.type
- && h.type < ACE_ES_EVENT_UNDEFINED))
- continue;
- pub.publications[c].event.header = h;
- pub.publications[c].dependency_info.dependency_type =
- RtecBase::TWO_WAY_CALL;
- pub.publications[c].dependency_info.number_of_calls = 1;
- pub.publications[c].dependency_info.rt_info = this->lcl_info_;
- c++;
- }
- //ACE_DEBUG ((LM_DEBUG,
- // "ECG (%t) supplier id %d has %d elements\n",
- // sid, c));
- if (c == 0)
- continue;
-
- pub.publications.length (c);
-
- // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
- // ACE_SupplierQOS_Factory::debug (pub);
- (*j).int_id_->connect_push_supplier (supplier_ref.in (),
- pub
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
- }
-
- // Also build the subscriptions that are *not* by source and connect
- // to the default consumer proxy.
- RtecEventChannelAdmin::SupplierQOS pub;
- pub.publications.length (sub.dependencies.length () - 1);
- pub.is_gateway = 1;
- int c = 0;
- for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
- {
- const RtecEventComm::EventHeader& h =
- sub.dependencies[k].event.header;
- RtecEventComm::EventSourceID sid = h.source;
- if (sid != 0
- || (0 <= h.type
- && h.type < ACE_ES_EVENT_UNDEFINED))
- continue;
- pub.publications[c].event.header = h;
- pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
- pub.publications[c].dependency_info.dependency_type =
- RtecBase::TWO_WAY_CALL;
- pub.publications[c].dependency_info.number_of_calls = 1;
- pub.publications[c].dependency_info.rt_info = this->lcl_info_;
- c++;
- }
-
- if (c > 0)
- {
- this->supplier_is_active_ = 1;
-
- // Obtain a reference to our supplier personality...
- RtecEventComm::PushSupplier_var supplier_ref =
- this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- // Obtain the consumer....
- this->default_consumer_proxy_ =
- supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- pub.publications.length (c);
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
- // ACE_SupplierQOS_Factory::debug (pub);
- this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
- pub
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
-
-
-
- RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- this->rmt_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->supplier_proxy_ =
- consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- this->consumer_is_active_ = 1;
- RtecEventComm::PushConsumer_var consumer_ref =
- this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-
- // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
- // ACE_ConsumerQOS_Factory::debug (sub);
-
- this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
- sub
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
-}
-
-void
-TAO_EC_Gateway_IIOP::update_supplier (
- const RtecEventChannelAdmin::SupplierQOS&
- ACE_ENV_ARG_DECL_NOT_USED)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- // Do nothing...
-}
-
-void
-TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
-{
- // ACE_DEBUG ((LM_DEBUG,
- // "ECG (%t): Supplier-consumer received "
- // "disconnect from channel.\n"));
-}
-
-void
-TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
-{
- // ACE_DEBUG ((LM_DEBUG,
- // "ECG (%t): Supplier received "
- // "disconnect from channel.\n"));
-}
-
-void
-TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events
- ACE_ENV_ARG_DECL)
-{
- // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));
-
- if (events.length () == 0)
- {
- // ACE_DEBUG ((LM_DEBUG, "no events\n"));
- return;
- }
-
- {
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->busy_count_++;
- }
-
- // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
-
- // @@ TODO, there is an extra data copy here, we should do the event
- // modification without it and only compact the necessary events.
- RtecEventComm::EventSet out (1);
- out.length (1);
- for (CORBA::ULong i = 0; i < events.length (); ++i)
- {
- if (events[i].header.ttl == 0)
- continue;
-
- RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
- RtecEventComm::EventSourceID sid = events[i].header.source;
- if (sid == 0
- || this->consumer_proxy_map_.find (sid, proxy) == -1)
- {
- // If the source is not in our map we have to use the
- // default consumer proxy.
- proxy = this->default_consumer_proxy_.in ();
- }
-
- if (CORBA::is_nil (proxy))
- continue;
-
- out[0] = events[i];
- out[0].header.ttl--;
-
- // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
- proxy->push (out ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
-
- {
- ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->busy_count_--;
-
- if (this->busy_count_ == 0 && this->update_posted_ != 0)
- {
- this->update_posted_ = 0;
- this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- }
- }
-
-}
-
-int
-TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL)
-{
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
-
- this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- if (this->supplier_is_active_)
- {
- PortableServer::POA_var poa =
- this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- this->supplier_is_active_ = 0;
- }
-
- if (this->consumer_is_active_)
- {
- PortableServer::POA_var poa =
- this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
- this->consumer_is_active_ = 0;
- }
-
- this->lcl_ec_ =
- RtecEventChannelAdmin::EventChannel::_nil ();
- this->rmt_ec_ =
- RtecEventChannelAdmin::EventChannel::_nil ();
-
- return 0;
-}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
index 007e0c47c43..aa29391757f 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
@@ -19,8 +19,6 @@
#include "orbsvcs/Event/event_export.h"
#include "orbsvcs/RtecEventChannelAdminS.h"
#include "orbsvcs/RtecEventCommS.h"
-#include "orbsvcs/Channel_Clients.h"
-#include "ace/Map_Manager.h"
/**
* @class TAO_EC_Gateway
@@ -59,131 +57,5 @@ private:
RtecEventChannelAdmin::Observer_Handle handle_;
};
-// ****************************************************************
-/**
- * @class TAO_EC_Gateway_IIOP
- *
- * @brief Event Channel Gateway using IIOP.
- *
- * This class mediates among two event channels, it connects as a
- * consumer of events with a remote event channel, and as a supplier
- * of events with the local EC.
- * As a consumer it gives a QoS designed to only accept the events
- * in which *local* consumers are interested.
- * Eventually the local EC should create this object and compute its
- * QoS in an automated manner; but this requires some way to filter
- * out the peers registered as consumers, otherwise we will get
- * loops in the QoS graph.
- * It uses exactly the same set of events in the publications list
- * when connected as a supplier.
- *
- * @note
- * An alternative implementation would be to register with the
- * remote EC as a supplier, and then filter on the remote EC, but
- * one of the objectives is to minimize network traffic.
- * On the other hand the events will be pushed to remote consumers,
- * event though they will be dropped upon receipt (due to the TTL
- * field); IMHO this is another suggestion that the EC needs to know
- * (somehow) which consumers are truly its peers in disguise.
- *
- * @todo: The class makes an extra copy of the events, we need to
- * investigate if closer collaboration with its collocated EC could
- * be used to remove that copy.
- */
-class TAO_RTEvent_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway
-{
-public:
- TAO_EC_Gateway_IIOP (void);
- ~TAO_EC_Gateway_IIOP (void);
-
- /// To do its job this class requires to know the local and remote
- /// ECs it will connect to,
- void init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
- RtecEventChannelAdmin::EventChannel_ptr lcl_ec
- ACE_ENV_ARG_DECL);
-
- /// The channel is disconnecting.
- void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
-
- /// The channel is disconnecting.
- void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
-
- /// This is the Consumer side behavior, it pushes the events to the
- /// local event channel.
- void push (const RtecEventComm::EventSet &events
- ACE_ENV_ARG_DECL_WITH_DEFAULTS);
-
- /// Disconnect and shutdown the gateway
- int shutdown (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
-
- // The following methods are documented in the base class.
- virtual void close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
- virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((CORBA::SystemException));
- virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub
- ACE_ENV_ARG_DECL_WITH_DEFAULTS)
- ACE_THROW_SPEC ((CORBA::SystemException));
-
-private:
- void close_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);
-
- void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub
- ACE_ENV_ARG_DECL);
-
-protected:
- /// Do the real work in init()
- void init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
- RtecEventChannelAdmin::EventChannel_ptr lcl_ec
- ACE_ENV_ARG_DECL);
-
-protected:
- /// Lock to synchronize internal changes
- TAO_SYNCH_MUTEX lock_;
-
- /// How many threads are running push() we cannot make changes until
- /// that reaches 0
- CORBA::ULong busy_count_;
-
- /**
- * An update_consumer() message arrived *while* we were doing a
- * push() the modification is stored <pub_>, if multiple
- * update_consumer messages arrive only the last one is executed.
- */
- int update_posted_;
- RtecEventChannelAdmin::ConsumerQOS c_qos_;
-
- /// The remote and the local EC, so we can reconnect when the list changes.
- RtecEventChannelAdmin::EventChannel_var rmt_ec_;
- RtecEventChannelAdmin::EventChannel_var lcl_ec_;
-
- /// Our remote RT_Infos.
- RtecBase::handle_t rmt_info_;
- /// Our local RT_Infos.
- RtecBase::handle_t lcl_info_;
-
- /// Our consumer personality....
- /// If it is not 0 then we must deactivate the supplier
- ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_;
- int consumer_is_active_;
-
- /// Our supplier personality....
- /// If it is not 0 then we must deactivate the supplier
- ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_;
- int supplier_is_active_;
-
- // We use a different Consumer_Proxy
- typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map;
- typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator;
-
- /// We talk to the EC (as a supplier) using either an per-supplier
- /// proxy or a generic proxy for the type only subscriptions.
- Consumer_Map consumer_proxy_map_;
- RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_;
-
- /// We talk to the EC (as a consumer) using this proxy.
- RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
-};
-
#include "ace/post.h"
#endif /* ACE_EC_GATEWAY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp
new file mode 100644
index 00000000000..dc990e5544c
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp
@@ -0,0 +1,477 @@
+// $Id$
+
+#include "orbsvcs/Event/EC_Gateway_IIOP.h"
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Time_Utilities.h"
+
+ACE_RCSID(Event, EC_Gateway, "$Id$")
+
+TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
+ : busy_count_ (0),
+ update_posted_ (0),
+ rmt_info_ (0),
+ lcl_info_ (0),
+ consumer_ (this),
+ consumer_is_active_ (0),
+ supplier_ (this),
+ supplier_is_active_ (0)
+{
+}
+
+TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void)
+{
+}
+
+int
+TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
+ RtecEventChannelAdmin::EventChannel_ptr lcl_ec
+ ACE_ENV_ARG_DECL)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ return this->init_i (rmt_ec, lcl_ec ACE_ENV_ARG_PARAMETER);
+}
+
+int
+TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
+ RtecEventChannelAdmin::EventChannel_ptr lcl_ec
+ ACE_ENV_ARG_DECL_NOT_USED)
+{
+ if (CORBA::is_nil (this->rmt_ec_.in ()) && CORBA::is_nil (this->lcl_ec_.in ()))
+ {
+ this->rmt_ec_ =
+ RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec);
+ this->lcl_ec_ =
+ RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
+
+ return 0;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO_EC_Gateway_IIOP - init_i "
+ "Remote and local event channel reference "
+ "should be nil.\n"), -1);
+}
+
+void
+TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL)
+{
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
+}
+
+
+void
+TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL)
+{
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
+
+ if (this->consumer_proxy_map_.current_size () > 0)
+ {
+ for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
+ j != this->consumer_proxy_map_.end ();
+ ++j)
+ {
+ RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
+ if (CORBA::is_nil (consumer))
+ continue;
+ ACE_TRY
+ {
+ consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ }
+ ACE_ENDTRY;
+ CORBA::release (consumer);
+ }
+ // Remove all the elements on the map. Calling close() does not
+ // work because the map is left in an inconsistent state.
+ this->consumer_proxy_map_.open ();
+ }
+
+ if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
+ {
+ this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->default_consumer_proxy_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+ }
+
+ if (!CORBA::is_nil (this->supplier_proxy_.in ()))
+ {
+ this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->supplier_proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+ }
+}
+
+void
+TAO_EC_Gateway_IIOP::update_consumer (
+ const RtecEventChannelAdmin::ConsumerQOS& c_qos
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (c_qos.dependencies.length () == 0)
+ return;
+
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->busy_count_ != 0)
+ {
+ this->update_posted_ = 1;
+ this->c_qos_ = c_qos;
+ return;
+ }
+
+ this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER);
+}
+
+void
+TAO_EC_Gateway_IIOP::update_consumer_i (
+ const RtecEventChannelAdmin::ConsumerQOS& c_qos
+ ACE_ENV_ARG_DECL)
+{
+ this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (CORBA::is_nil (this->lcl_ec_.in ())
+ || CORBA::is_nil (this->rmt_ec_.in ()))
+ return;
+
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
+
+ // = Connect as a supplier to the local EC
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Change the RT_Info in the consumer QoS.
+ // On the same loop we discover the subscriptions by event source,
+ // and fill the consumer proxy map.
+ RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
+ sub.is_gateway = 1;
+ for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
+ {
+ sub.dependencies[i].rt_info = this->rmt_info_;
+
+ RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
+ const RtecEventComm::EventHeader &h =
+ sub.dependencies[i].event.header;
+
+ RtecEventComm::EventSourceID sid = h.source;
+
+ //ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) trying (%d,%d)\n",
+ // sid, h.type));
+
+ // Skip all subscriptions that do not require an specific source
+ // id.
+ if (sid == 0)
+ continue;
+
+ // Skip all the magic event types.
+ if (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
+ continue;
+
+ if (this->consumer_proxy_map_.find (sid, proxy) == -1)
+ {
+ //ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) binding source %d\n",
+ // sid));
+ proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ this->consumer_proxy_map_.bind (sid, proxy);
+ }
+ }
+ //ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) consumer map computed (%d entries)\n",
+ // this->consumer_proxy_map_.current_size ()));
+
+ if (this->consumer_proxy_map_.current_size () > 0)
+ {
+ this->supplier_is_active_ = 1;
+
+ // Obtain a reference to our supplier personality...
+ RtecEventComm::PushSupplier_var supplier_ref =
+ this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // For each subscription by source build the set of publications
+ // (they may several, by type, for instance) and connect to the
+ // consumer proxy.
+ for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin ();
+ j != this->consumer_proxy_map_.end ();
+ ++j)
+ {
+ RtecEventChannelAdmin::SupplierQOS pub;
+ pub.publications.length (sub.dependencies.length () + 1);
+ pub.is_gateway = 1;
+
+ int c = 0;
+
+ RtecEventComm::EventSourceID sid = (*j).ext_id_;
+ for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
+ {
+ const RtecEventComm::EventHeader& h =
+ sub.dependencies[k].event.header;
+ if (h.source != sid
+ || (0 < h.type
+ && h.type < ACE_ES_EVENT_UNDEFINED))
+ continue;
+ pub.publications[c].event.header = h;
+ pub.publications[c].dependency_info.dependency_type =
+ RtecBase::TWO_WAY_CALL;
+ pub.publications[c].dependency_info.number_of_calls = 1;
+ pub.publications[c].dependency_info.rt_info = this->lcl_info_;
+ c++;
+ }
+ //ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) supplier id %d has %d elements\n",
+ // sid, c));
+ if (c == 0)
+ continue;
+
+ pub.publications.length (c);
+
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier "));
+ // ACE_SupplierQOS_Factory::debug (pub);
+ (*j).int_id_->connect_push_supplier (supplier_ref.in (),
+ pub
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+ }
+
+ // Also build the subscriptions that are *not* by source and connect
+ // to the default consumer proxy.
+ RtecEventChannelAdmin::SupplierQOS pub;
+ pub.publications.length (sub.dependencies.length () - 1);
+ pub.is_gateway = 1;
+ int c = 0;
+ for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
+ {
+ const RtecEventComm::EventHeader& h =
+ sub.dependencies[k].event.header;
+ RtecEventComm::EventSourceID sid = h.source;
+ if (sid != 0
+ || (0 <= h.type
+ && h.type < ACE_ES_EVENT_UNDEFINED))
+ continue;
+ pub.publications[c].event.header = h;
+ pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero ();
+ pub.publications[c].dependency_info.dependency_type =
+ RtecBase::TWO_WAY_CALL;
+ pub.publications[c].dependency_info.number_of_calls = 1;
+ pub.publications[c].dependency_info.rt_info = this->lcl_info_;
+ c++;
+ }
+
+ if (c > 0)
+ {
+ this->supplier_is_active_ = 1;
+
+ // Obtain a reference to our supplier personality...
+ RtecEventComm::PushSupplier_var supplier_ref =
+ this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Obtain the consumer....
+ this->default_consumer_proxy_ =
+ supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ pub.publications.length (c);
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
+ // ACE_SupplierQOS_Factory::debug (pub);
+ this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
+ pub
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ this->rmt_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->supplier_proxy_ =
+ consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->consumer_is_active_ = 1;
+ RtecEventComm::PushConsumer_var consumer_ref =
+ this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer "));
+ // ACE_ConsumerQOS_Factory::debug (sub);
+
+ this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
+ sub
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+}
+
+void
+TAO_EC_Gateway_IIOP::update_supplier (
+ const RtecEventChannelAdmin::SupplierQOS&
+ ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ // Do nothing...
+}
+
+void
+TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ // ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t): Supplier-consumer received "
+ // "disconnect from channel.\n"));
+}
+
+void
+TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+{
+ // ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t): Supplier received "
+ // "disconnect from channel.\n"));
+}
+
+void
+TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events
+ ACE_ENV_ARG_DECL)
+{
+ // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n"));
+
+ if (events.length () == 0)
+ {
+ // ACE_DEBUG ((LM_DEBUG, "no events\n"));
+ return;
+ }
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->busy_count_++;
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
+
+ // @@ TODO, there is an extra data copy here, we should do the event
+ // modification without it and only compact the necessary events.
+ RtecEventComm::EventSet out (1);
+ out.length (1);
+ for (CORBA::ULong i = 0; i < events.length (); ++i)
+ {
+ if (events[i].header.ttl == 0)
+ continue;
+
+ RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
+ RtecEventComm::EventSourceID sid = events[i].header.source;
+ if (sid == 0
+ || this->consumer_proxy_map_.find (sid, proxy) == -1)
+ {
+ // If the source is not in our map we have to use the
+ // default consumer proxy.
+ proxy = this->default_consumer_proxy_.in ();
+ }
+
+ if (CORBA::is_nil (proxy))
+ continue;
+
+ out[0] = events[i];
+ out[0].header.ttl--;
+
+ // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n"));
+ proxy->push (out ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->busy_count_--;
+
+ if (this->busy_count_ == 0 && this->update_posted_ != 0)
+ {
+ this->update_posted_ = 0;
+ this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ }
+ }
+
+}
+
+int
+TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+{
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ if (this->supplier_is_active_)
+ {
+ PortableServer::POA_var poa =
+ this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ this->supplier_is_active_ = 0;
+ }
+
+ if (this->consumer_is_active_)
+ {
+ PortableServer::POA_var poa =
+ this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+ this->consumer_is_active_ = 0;
+ }
+
+ this->lcl_ec_ =
+ RtecEventChannelAdmin::EventChannel::_nil ();
+ this->rmt_ec_ =
+ RtecEventChannelAdmin::EventChannel::_nil ();
+
+ return 0;
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>;
+template class ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>;
+template class ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>;
+template class ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
+template class ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
+template class ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
+template class ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>;
+
+#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>
+#pragma instantiate ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>
+
+#pragma instantiate ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>
+#pragma instantiate ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
+#pragma instantiate ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
+#pragma instantiate ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
+#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h
new file mode 100644
index 00000000000..4877d0fb8f4
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h
@@ -0,0 +1,157 @@
+/* -*- C++ -*- */
+/**
+ * @file EC_Gateway_IIOP.h
+ *
+ * $Id$
+ *
+ * @author Carlos O'Ryan (coryan@cs.wustl.edu)
+ *
+ * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and
+ * other members of the DOC group. More details can be found in:
+ *
+ * http://doc.ece.uci.edu/~coryan/EC/index.html
+ */
+
+#ifndef TAO_EC_GATEWAY_IIOP_H
+#define TAO_EC_GATEWAY_IIOP_H
+#include "ace/pre.h"
+
+#include "orbsvcs/Event/event_export.h"
+#include "orbsvcs/Event/EC_Gateway.h"
+#include "orbsvcs/RtecEventChannelAdminS.h"
+#include "orbsvcs/RtecEventCommS.h"
+#include "orbsvcs/Channel_Clients.h"
+#include "ace/Map_Manager.h"
+
+/**
+ * @class TAO_EC_Gateway_IIOP
+ *
+ * @brief Event Channel Gateway using IIOP.
+ *
+ * This class mediates among two event channels, it connects as a
+ * consumer of events with a remote event channel, and as a supplier
+ * of events with the local EC.
+ * As a consumer it gives a QoS designed to only accept the events
+ * in which *local* consumers are interested.
+ * Eventually the local EC should create this object and compute its
+ * QoS in an automated manner; but this requires some way to filter
+ * out the peers registered as consumers, otherwise we will get
+ * loops in the QoS graph.
+ * It uses exactly the same set of events in the publications list
+ * when connected as a supplier.
+ *
+ * @note
+ * An alternative implementation would be to register with the
+ * remote EC as a supplier, and then filter on the remote EC, but
+ * one of the objectives is to minimize network traffic.
+ * On the other hand the events will be pushed to remote consumers,
+ * event though they will be dropped upon receipt (due to the TTL
+ * field); IMHO this is another suggestion that the EC needs to know
+ * (somehow) which consumers are truly its peers in disguise.
+ *
+ * @todo: The class makes an extra copy of the events, we need to
+ * investigate if closer collaboration with its collocated EC could
+ * be used to remove that copy.
+ */
+class TAO_RTEvent_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway
+{
+public:
+ TAO_EC_Gateway_IIOP (void);
+ ~TAO_EC_Gateway_IIOP (void);
+
+ /**
+ * To do its job this class requires to know the local and remote ECs it will
+ * connect to.
+ * @return 0 in case of success, -1 in case of failure
+ */
+ int init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
+ RtecEventChannelAdmin::EventChannel_ptr lcl_ec
+ ACE_ENV_ARG_DECL);
+
+ /// The channel is disconnecting.
+ void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+
+ /// The channel is disconnecting.
+ void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+
+ /// This is the Consumer side behavior, it pushes the events to the
+ /// local event channel.
+ void push (const RtecEventComm::EventSet &events
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS);
+
+ /// Disconnect and shutdown the gateway
+ int shutdown (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+
+ // The following methods are documented in the base class.
+ virtual void close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+ virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+ virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+private:
+ void close_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED);
+
+ void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub
+ ACE_ENV_ARG_DECL);
+
+protected:
+ /// Do the real work in init()
+ int init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
+ RtecEventChannelAdmin::EventChannel_ptr lcl_ec
+ ACE_ENV_ARG_DECL);
+
+protected:
+ /// Lock to synchronize internal changes
+ TAO_SYNCH_MUTEX lock_;
+
+ /// How many threads are running push() we cannot make changes until
+ /// that reaches 0
+ CORBA::ULong busy_count_;
+
+ /**
+ * An update_consumer() message arrived *while* we were doing a
+ * push() the modification is stored <pub_>, if multiple
+ * update_consumer messages arrive only the last one is executed.
+ */
+ int update_posted_;
+ RtecEventChannelAdmin::ConsumerQOS c_qos_;
+
+ /// The remote and the local EC, so we can reconnect when the list changes.
+ RtecEventChannelAdmin::EventChannel_var rmt_ec_;
+ RtecEventChannelAdmin::EventChannel_var lcl_ec_;
+
+ /// Our remote RT_Infos.
+ RtecBase::handle_t rmt_info_;
+ /// Our local RT_Infos.
+ RtecBase::handle_t lcl_info_;
+
+ /// Our consumer personality....
+ ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_;
+
+ /// If it is not 0 then we must deactivate the consumer
+ int consumer_is_active_;
+
+ /// Our supplier personality....
+ ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_;
+
+ /// If it is not 0 then we must deactivate the supplier
+ int supplier_is_active_;
+
+ // We use a different Consumer_Proxy
+ typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map;
+ typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator;
+
+ /// We talk to the EC (as a supplier) using either an per-supplier
+ /// proxy or a generic proxy for the type only subscriptions.
+ Consumer_Map consumer_proxy_map_;
+ RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_;
+
+ /// We talk to the EC (as a consumer) using this proxy.
+ RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
+};
+
+#include "ace/post.h"
+#endif /* ACE_EC_GATEWAY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp
index 4a824e6f418..dccdf792bd7 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp
@@ -25,16 +25,13 @@ activate (T & obj_ref,
suggested_object_deactivator.set_values (poa, obj_id.in ());
-
// Get the object reference of the activated object.
CORBA::Object_var obj =
poa->id_to_reference (obj_id.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
obj_ref =
- my_narrow_until_carlos_gets_jeff_to_fix_the_idl_compiler (obj_ref.ptr (),
- obj.in ()
- ACE_ENV_ARG_PARAMETER);
+ T::_obj_type::_narrow (obj.in() ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (CORBA::is_nil (obj_ref.in ()))
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h
index df358c02550..0aeff3e86d7 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h
@@ -24,9 +24,9 @@
/**
* @brief Helper for activating objects.
- * Activates <servant> with <poa> and returns the object reference via
- * <obj_ref>. If <object_deactivator> != 0, it is populated with info
- * necessary to deactivate the <servant> from <poa>
+ * Activates @a servant with @a poa and returns the object reference via
+ * @a obj_ref. If @a object_deactivator != 0, it is populated with info
+ * necessary to deactivate the @a servant from @a poa.
*/
template <typename T>
void activate (T & obj_ref,
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i
index c206e93746e..bb025cae6d2 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i
@@ -1,19 +1,5 @@
// $Id$
-// Helper.
-template <typename T>
-ACE_INLINE typename T::_var_type
-my_narrow_until_carlos_gets_jeff_to_fix_the_idl_compiler (
- T *,
- CORBA::Object_ptr obj
- ACE_ENV_ARG_DECL)
-{
- return T::_narrow (obj ACE_ENV_ARG_PARAMETER);
-}
-
-
-//***************************************************************************
-
template <class T>
ACE_INLINE
TAO_EC_Auto_Command<T>::TAO_EC_Auto_Command (void)
diff --git a/TAO/orbsvcs/orbsvcs/Makefile.RTEvent b/TAO/orbsvcs/orbsvcs/Makefile.RTEvent
index e47a774c008..d966f52234e 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile.RTEvent
+++ b/TAO/orbsvcs/orbsvcs/Makefile.RTEvent
@@ -90,6 +90,7 @@ CPP_SRCS += \
CPP_SRCS += \
Event_Utilities \
Event/EC_Gateway \
+ Event/EC_Gateway_IIOP \
Event/ECG_UDP_Out_Endpoint \
Event/ECG_UDP_Sender \
Event/EC_UDP_Admin \
@@ -103,7 +104,7 @@ CPP_SRCS += \
Event/ECG_UDP_EH \
Event/ECG_Mcast_EH \
Event/ECG_UDP_Receiver \
- Event/ECG_CDR_Message_Sender
+ Event/ECG_CDR_Message_Sender
IDL_SRC = \
$(addsuffix S.cpp, $(IDL_FILES)) \
diff --git a/TAO/orbsvcs/orbsvcs/RTEvent.bor b/TAO/orbsvcs/orbsvcs/RTEvent.bor
index 422f1d611ff..d84a5bdf8bf 100644
--- a/TAO/orbsvcs/orbsvcs/RTEvent.bor
+++ b/TAO/orbsvcs/orbsvcs/RTEvent.bor
@@ -43,6 +43,7 @@ OBJFILES = \
$(OBJDIR)\EC_Filter_Builder.obj \
$(OBJDIR)\EC_Group_Scheduling.obj \
$(OBJDIR)\EC_Gateway.obj \
+ $(OBJDIR)\EC_Gateway_IIOP.obj \
$(OBJDIR)\ECG_UDP_Out_Endpoint.obj \
$(OBJDIR)\ECG_Mcast_EH.obj \
$(OBJDIR)\ECG_UDP_Sender.obj \
diff --git a/TAO/orbsvcs/orbsvcs/RTEvent.dsp b/TAO/orbsvcs/orbsvcs/RTEvent.dsp
index 396d65d7aa4..e2c320d97b2 100644
--- a/TAO/orbsvcs/orbsvcs/RTEvent.dsp
+++ b/TAO/orbsvcs/orbsvcs/RTEvent.dsp
@@ -244,6 +244,10 @@ SOURCE=.\Event\EC_Gateway.cpp
# End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Group_Scheduling.cpp
# End Source File
# Begin Source File
@@ -541,6 +545,10 @@ SOURCE=.\Event\EC_Gateway.h
# End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Group_Scheduling.h
# End Source File
# Begin Source File
diff --git a/TAO/orbsvcs/orbsvcs/RTEvent.mpc b/TAO/orbsvcs/orbsvcs/RTEvent.mpc
index 05a53bdb3ec..c1c8875b0a6 100644
--- a/TAO/orbsvcs/orbsvcs/RTEvent.mpc
+++ b/TAO/orbsvcs/orbsvcs/RTEvent.mpc
@@ -13,7 +13,7 @@ project(RTEvent) : orbsvcslib, core, svc_utils, messaging, portableserver {
RtecUDPAdmin.idl
}
- // This could be simplified if the cpp files for RTEvent were
+ // This could be simplified if the cpp files for RTEvent were
// in their own directory.
Source_Files(ORBSVCS_COMPONENTS) {
RTEvent {
@@ -61,6 +61,7 @@ project(RTEvent) : orbsvcslib, core, svc_utils, messaging, portableserver {
Event/EC_Filter.cpp
Event/EC_Filter_Builder.cpp
Event/EC_Gateway.cpp
+ Event/EC_Gateway_IIOP.cpp
Event/EC_Group_Scheduling.cpp
Event/EC_Lifetime_Utils.cpp
Event/EC_Lifetime_Utils_T.cpp
diff --git a/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp b/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp
index 390d8eaa3b5..b207e50615a 100644
--- a/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp
+++ b/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp
@@ -783,6 +783,10 @@ SOURCE=.\Event\EC_Filter_Builder.cpp
# End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Gateway.cpp
# End Source File
# Begin Source File
@@ -1083,6 +1087,10 @@ SOURCE=.\Event\EC_Gateway.h
# End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Group_Scheduling.h
# End Source File
# Begin Source File
diff --git a/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp b/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp
index 95cba52845f..149b02bea9e 100644
--- a/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp
+++ b/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp
@@ -16,7 +16,7 @@
#include "Loopback.h"
#include "orbsvcs/Event/EC_Event_Channel.h"
-#include "orbsvcs/Event/EC_Gateway.h"
+#include "orbsvcs/Event/EC_Gateway_IIOP.h"
ACE_RCSID (TAO_RTEC_Perf, Peer_Base, "$Id$")
diff --git a/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile b/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile
index 0a3b38d1722..a2c926e221d 100644
--- a/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile
+++ b/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile
@@ -8,8 +8,4 @@ include $(ACE_ROOT)/include/makeinclude/macros.GNU
TARGETS_NESTED := $(TARGETS_NESTED:.nested=)
$(TARGETS_NESTED):
-ifneq (Windows,$(findstring Windows,$(OS)))
@$(MAKE) -f Makefile.Bug_1393_Client -C . $(@);
-else
- -@cmd /c "$(MAKE) -f Makefile.Bug_1393_Client -C . $(@)"
-endif
diff --git a/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile b/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile
index 6abffaea28d..69b4f1a1da0 100644
--- a/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile
+++ b/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile
@@ -11,11 +11,6 @@ MFILES = \
Makefile.Bug_1395_Client
$(TARGETS_NESTED):
-ifneq (Windows,$(findstring Windows,$(OS)))
@for file in $(MFILES); do \
$(MAKE) -f `basename $$file` -C `dirname $$file` $(@); \
done
-else
- -@cmd /c "$(MAKE) -f Makefile.Bug_1395_Server -C . $(@)"
- -@cmd /c "$(MAKE) -f Makefile.Bug_1395_Client -C . $(@)"
-endif
diff --git a/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp b/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp
index 527d1c124e2..25823fbe628 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp
+++ b/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp
@@ -6,7 +6,7 @@
#include "orbsvcs/Event_Utilities.h"
#include "orbsvcs/Event/EC_Event_Channel.h"
#include "orbsvcs/Event/EC_Default_Factory.h"
-#include "orbsvcs/Event/EC_Gateway.h"
+#include "orbsvcs/Event/EC_Gateway_IIOP.h"
ACE_RCSID(EC_Tests, Gateway, "$Id$")
diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.h b/TAO/orbsvcs/tests/Event/Basic/Observer.h
index 7c562cfab60..4ff6fd35604 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Observer.h
+++ b/TAO/orbsvcs/tests/Event/Basic/Observer.h
@@ -18,7 +18,7 @@
#define EC_OBSERVER_H
#include "Driver.h"
-#include "orbsvcs/Event/EC_Gateway.h"
+#include "orbsvcs/Event/EC_Gateway_IIOP.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
diff --git a/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor b/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor
index fed954b1599..11ffedd9f3e 100644
--- a/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor
+++ b/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor
@@ -28,6 +28,7 @@ CFLAGS = \
$(TAO_NOTIFYTESTS_CFLAGS) \
$(TAO_NOTIFY_CFLAGS) \
$(TAO_ETCL_CFLAGS) \
+ $(TAO_DYNAMICANY_CFLAGS) \
$(TAO_MESSAGING_CFLAGS)
CPPDIR = .
@@ -43,6 +44,7 @@ LIBFILES = \
$(TAO_NOTIFYTESTS_LIB) \
$(TAO_NOTIFY_LIB) \
$(TAO_ETCL_LIB) \
+ $(TAO_DYNAMICANY_LIB) \
$(TAO_MESSAGING_LIB)
!include <$(ACE_ROOT)\include\makeinclude\recurse.bor>