diff options
Diffstat (limited to 'TAO')
19 files changed, 690 insertions, 645 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp index a0696b23d7d..cbddb37f659 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel_Base.cpp @@ -46,28 +46,36 @@ TAO_EC_Event_Channel_Base (const TAO_EC_Event_Channel_Attributes& attr, TAO_EC_Event_Channel_Base::~TAO_EC_Event_Channel_Base (void) { - this->factory_->destroy_dispatching (this->dispatching_); - this->dispatching_ = 0; - this->factory_->destroy_filter_builder (this->filter_builder_); - this->filter_builder_ = 0; - this->factory_->destroy_supplier_filter_builder (this->supplier_filter_builder_); - this->supplier_filter_builder_ = 0; - this->factory_->destroy_consumer_admin (this->consumer_admin_); - this->consumer_admin_ = 0; - this->factory_->destroy_supplier_admin (this->supplier_admin_); - this->supplier_admin_ = 0; - this->factory_->destroy_timeout_generator (this->timeout_generator_); - this->timeout_generator_ = 0; - this->factory_->destroy_observer_strategy (this->observer_strategy_); - this->observer_strategy_ = 0; + // Destroy Strategies in the reverse order of creation, they + // refere to each other during destruction and thus need to be + // cleaned up properly. + this->factory_->destroy_supplier_control (this->supplier_control_); + this->supplier_control_ = 0; + this->factory_->destroy_consumer_control (this->consumer_control_); + this->consumer_control_ = 0; this->factory_->destroy_scheduling_strategy (this->scheduling_strategy_); this->scheduling_strategy_ = 0; - this->factory_->destroy_consumer_control (this->consumer_control_); - this->consumer_control_ = 0; - this->factory_->destroy_supplier_control (this->supplier_control_); - this->supplier_control_ = 0; + this->factory_->destroy_observer_strategy (this->observer_strategy_); + this->observer_strategy_ = 0; + + this->factory_->destroy_timeout_generator (this->timeout_generator_); + this->timeout_generator_ = 0; + + this->factory_->destroy_supplier_admin (this->supplier_admin_); + this->supplier_admin_ = 0; + this->factory_->destroy_consumer_admin (this->consumer_admin_); + this->consumer_admin_ = 0; + + this->factory_->destroy_supplier_filter_builder (this->supplier_filter_builder_); + this->supplier_filter_builder_ = 0; + + this->factory_->destroy_filter_builder (this->filter_builder_); + this->filter_builder_ = 0; + + this->factory_->destroy_dispatching (this->dispatching_); + this->dispatching_ = 0; this->factory (0, 0); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp index ef141efbfb1..0a7b44ea438 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp @@ -1,35 +1,9 @@ // $Id$ #include "orbsvcs/Event/EC_Gateway.h" -#include "orbsvcs/Event_Utilities.h" -#include "orbsvcs/Time_Utilities.h" ACE_RCSID(Event, EC_Gateway, "$Id$") - -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -template class ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>; -template class ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>; -template class ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>; -template class ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; -template class ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; -template class ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; -template class ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; - -#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) - -#pragma instantiate ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> -#pragma instantiate ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> - -#pragma instantiate ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr> -#pragma instantiate ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> -#pragma instantiate ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> -#pragma instantiate ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> -#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> - -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - TAO_EC_Gateway::TAO_EC_Gateway (void) : handle_ (0) { @@ -51,441 +25,3 @@ TAO_EC_Gateway::observer_handle (void) const return this->handle_; } -TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void) - : busy_count_ (0), - update_posted_ (0), - rmt_info_ (0), - lcl_info_ (0), - consumer_ (this), - consumer_is_active_ (0), - supplier_ (this), - supplier_is_active_ (0) -{ -} - -TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void) -{ -} - -void -TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, - RtecEventChannelAdmin::EventChannel_ptr lcl_ec - ACE_ENV_ARG_DECL) -{ - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - - this->init_i (rmt_ec, lcl_ec ACE_ENV_ARG_PARAMETER); -} - -void -TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, - RtecEventChannelAdmin::EventChannel_ptr lcl_ec - ACE_ENV_ARG_DECL_NOT_USED) -{ - if (!CORBA::is_nil (this->rmt_ec_.in ())) - return; - - this->rmt_ec_ = - RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec); - this->lcl_ec_ = - RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); -} - -void -TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL) -{ - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - - this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER); -} - - -void -TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL) -{ - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); - - if (this->consumer_proxy_map_.current_size () > 0) - { - for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); - j != this->consumer_proxy_map_.end (); - ++j) - { - RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; - if (CORBA::is_nil (consumer)) - continue; - ACE_TRY - { - consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - } - ACE_ENDTRY; - CORBA::release (consumer); - } - // Remove all the elements on the map. Calling close() does not - // work because the map is left in an inconsistent state. - this->consumer_proxy_map_.open (); - } - - if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) - { - this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->default_consumer_proxy_ = - RtecEventChannelAdmin::ProxyPushConsumer::_nil (); - } - - if (!CORBA::is_nil (this->supplier_proxy_.in ())) - { - this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->supplier_proxy_ = - RtecEventChannelAdmin::ProxyPushSupplier::_nil (); - } -} - -void -TAO_EC_Gateway_IIOP::update_consumer ( - const RtecEventChannelAdmin::ConsumerQOS& c_qos - ACE_ENV_ARG_DECL) - ACE_THROW_SPEC ((CORBA::SystemException)) -{ - if (c_qos.dependencies.length () <= 1) - return; - - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - - if (this->busy_count_ != 0) - { - this->update_posted_ = 1; - this->c_qos_ = c_qos; - return; - } - - this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER); -} - -void -TAO_EC_Gateway_IIOP::update_consumer_i ( - const RtecEventChannelAdmin::ConsumerQOS& c_qos - ACE_ENV_ARG_DECL) -{ - this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - if (CORBA::is_nil (this->lcl_ec_.in ()) - || CORBA::is_nil (this->rmt_ec_.in ())) - return; - - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n")); - - // = Connect as a supplier to the local EC - RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = - this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - // Change the RT_Info in the consumer QoS. - // On the same loop we discover the subscriptions by event source, - // and fill the consumer proxy map. - RtecEventChannelAdmin::ConsumerQOS sub = c_qos; - sub.is_gateway = 1; - for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) - { - sub.dependencies[i].rt_info = this->rmt_info_; - - RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; - const RtecEventComm::EventHeader &h = - sub.dependencies[i].event.header; - - RtecEventComm::EventSourceID sid = h.source; - - //ACE_DEBUG ((LM_DEBUG, - // "ECG (%t) trying (%d,%d)\n", - // sid, h.type)); - - // Skip all subscriptions that do not require an specific source - // id. - if (sid == 0) - continue; - - // Skip all the magic event types. - if (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED) - continue; - - if (this->consumer_proxy_map_.find (sid, proxy) == -1) - { - //ACE_DEBUG ((LM_DEBUG, - // "ECG (%t) binding source %d\n", - // sid)); - proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - this->consumer_proxy_map_.bind (sid, proxy); - } - } - //ACE_DEBUG ((LM_DEBUG, - // "ECG (%t) consumer map computed (%d entries)\n", - // this->consumer_proxy_map_.current_size ())); - - if (this->consumer_proxy_map_.current_size () > 0) - { - this->supplier_is_active_ = 1; - - // Obtain a reference to our supplier personality... - RtecEventComm::PushSupplier_var supplier_ref = - this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - // For each subscription by source build the set of publications - // (they may several, by type, for instance) and connect to the - // consumer proxy. - for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); - j != this->consumer_proxy_map_.end (); - ++j) - { - RtecEventChannelAdmin::SupplierQOS pub; - pub.publications.length (sub.dependencies.length () + 1); - pub.is_gateway = 1; - - int c = 0; - - RtecEventComm::EventSourceID sid = (*j).ext_id_; - for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) - { - const RtecEventComm::EventHeader& h = - sub.dependencies[k].event.header; - if (h.source != sid - || (0 < h.type - && h.type < ACE_ES_EVENT_UNDEFINED)) - continue; - pub.publications[c].event.header = h; - pub.publications[c].dependency_info.dependency_type = - RtecBase::TWO_WAY_CALL; - pub.publications[c].dependency_info.number_of_calls = 1; - pub.publications[c].dependency_info.rt_info = this->lcl_info_; - c++; - } - //ACE_DEBUG ((LM_DEBUG, - // "ECG (%t) supplier id %d has %d elements\n", - // sid, c)); - if (c == 0) - continue; - - pub.publications.length (c); - - // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier ")); - // ACE_SupplierQOS_Factory::debug (pub); - (*j).int_id_->connect_push_supplier (supplier_ref.in (), - pub - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } - } - - // Also build the subscriptions that are *not* by source and connect - // to the default consumer proxy. - RtecEventChannelAdmin::SupplierQOS pub; - pub.publications.length (sub.dependencies.length () - 1); - pub.is_gateway = 1; - int c = 0; - for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) - { - const RtecEventComm::EventHeader& h = - sub.dependencies[k].event.header; - RtecEventComm::EventSourceID sid = h.source; - if (sid != 0 - || (0 <= h.type - && h.type < ACE_ES_EVENT_UNDEFINED)) - continue; - pub.publications[c].event.header = h; - pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero (); - pub.publications[c].dependency_info.dependency_type = - RtecBase::TWO_WAY_CALL; - pub.publications[c].dependency_info.number_of_calls = 1; - pub.publications[c].dependency_info.rt_info = this->lcl_info_; - c++; - } - - if (c > 0) - { - this->supplier_is_active_ = 1; - - // Obtain a reference to our supplier personality... - RtecEventComm::PushSupplier_var supplier_ref = - this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - // Obtain the consumer.... - this->default_consumer_proxy_ = - supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - pub.publications.length (c); - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); - // ACE_SupplierQOS_Factory::debug (pub); - this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (), - pub - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } - - - - RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - this->rmt_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->supplier_proxy_ = - consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - this->consumer_is_active_ = 1; - RtecEventComm::PushConsumer_var consumer_ref = - this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; - - // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer ")); - // ACE_ConsumerQOS_Factory::debug (sub); - - this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), - sub - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - -} - -void -TAO_EC_Gateway_IIOP::update_supplier ( - const RtecEventChannelAdmin::SupplierQOS& - ACE_ENV_ARG_DECL_NOT_USED) - ACE_THROW_SPEC ((CORBA::SystemException)) -{ - // Do nothing... -} - -void -TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) -{ - // ACE_DEBUG ((LM_DEBUG, - // "ECG (%t): Supplier-consumer received " - // "disconnect from channel.\n")); -} - -void -TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) -{ - // ACE_DEBUG ((LM_DEBUG, - // "ECG (%t): Supplier received " - // "disconnect from channel.\n")); -} - -void -TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events - ACE_ENV_ARG_DECL) -{ - // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n")); - - if (events.length () == 0) - { - // ACE_DEBUG ((LM_DEBUG, "no events\n")); - return; - } - - { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - - this->busy_count_++; - } - - // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ())); - - // @@ TODO, there is an extra data copy here, we should do the event - // modification without it and only compact the necessary events. - RtecEventComm::EventSet out (1); - out.length (1); - for (CORBA::ULong i = 0; i < events.length (); ++i) - { - if (events[i].header.ttl == 0) - continue; - - RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; - RtecEventComm::EventSourceID sid = events[i].header.source; - if (sid == 0 - || this->consumer_proxy_map_.find (sid, proxy) == -1) - { - // If the source is not in our map we have to use the - // default consumer proxy. - proxy = this->default_consumer_proxy_.in (); - } - - if (CORBA::is_nil (proxy)) - continue; - - out[0] = events[i]; - out[0].header.ttl--; - - // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n")); - proxy->push (out ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } - - { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - - this->busy_count_--; - - if (this->busy_count_ == 0 && this->update_posted_ != 0) - { - this->update_posted_ = 0; - this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - } - } - -} - -int -TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL) -{ - ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); - - this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - - if (this->supplier_is_active_) - { - PortableServer::POA_var poa = - this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - this->supplier_is_active_ = 0; - } - - if (this->consumer_is_active_) - { - PortableServer::POA_var poa = - this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); - this->consumer_is_active_ = 0; - } - - this->lcl_ec_ = - RtecEventChannelAdmin::EventChannel::_nil (); - this->rmt_ec_ = - RtecEventChannelAdmin::EventChannel::_nil (); - - return 0; -} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h index 007e0c47c43..aa29391757f 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h @@ -19,8 +19,6 @@ #include "orbsvcs/Event/event_export.h" #include "orbsvcs/RtecEventChannelAdminS.h" #include "orbsvcs/RtecEventCommS.h" -#include "orbsvcs/Channel_Clients.h" -#include "ace/Map_Manager.h" /** * @class TAO_EC_Gateway @@ -59,131 +57,5 @@ private: RtecEventChannelAdmin::Observer_Handle handle_; }; -// **************************************************************** -/** - * @class TAO_EC_Gateway_IIOP - * - * @brief Event Channel Gateway using IIOP. - * - * This class mediates among two event channels, it connects as a - * consumer of events with a remote event channel, and as a supplier - * of events with the local EC. - * As a consumer it gives a QoS designed to only accept the events - * in which *local* consumers are interested. - * Eventually the local EC should create this object and compute its - * QoS in an automated manner; but this requires some way to filter - * out the peers registered as consumers, otherwise we will get - * loops in the QoS graph. - * It uses exactly the same set of events in the publications list - * when connected as a supplier. - * - * @note - * An alternative implementation would be to register with the - * remote EC as a supplier, and then filter on the remote EC, but - * one of the objectives is to minimize network traffic. - * On the other hand the events will be pushed to remote consumers, - * event though they will be dropped upon receipt (due to the TTL - * field); IMHO this is another suggestion that the EC needs to know - * (somehow) which consumers are truly its peers in disguise. - * - * @todo: The class makes an extra copy of the events, we need to - * investigate if closer collaboration with its collocated EC could - * be used to remove that copy. - */ -class TAO_RTEvent_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway -{ -public: - TAO_EC_Gateway_IIOP (void); - ~TAO_EC_Gateway_IIOP (void); - - /// To do its job this class requires to know the local and remote - /// ECs it will connect to, - void init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, - RtecEventChannelAdmin::EventChannel_ptr lcl_ec - ACE_ENV_ARG_DECL); - - /// The channel is disconnecting. - void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); - - /// The channel is disconnecting. - void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); - - /// This is the Consumer side behavior, it pushes the events to the - /// local event channel. - void push (const RtecEventComm::EventSet &events - ACE_ENV_ARG_DECL_WITH_DEFAULTS); - - /// Disconnect and shutdown the gateway - int shutdown (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); - - // The following methods are documented in the base class. - virtual void close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); - virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((CORBA::SystemException)); - virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub - ACE_ENV_ARG_DECL_WITH_DEFAULTS) - ACE_THROW_SPEC ((CORBA::SystemException)); - -private: - void close_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED); - - void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub - ACE_ENV_ARG_DECL); - -protected: - /// Do the real work in init() - void init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, - RtecEventChannelAdmin::EventChannel_ptr lcl_ec - ACE_ENV_ARG_DECL); - -protected: - /// Lock to synchronize internal changes - TAO_SYNCH_MUTEX lock_; - - /// How many threads are running push() we cannot make changes until - /// that reaches 0 - CORBA::ULong busy_count_; - - /** - * An update_consumer() message arrived *while* we were doing a - * push() the modification is stored <pub_>, if multiple - * update_consumer messages arrive only the last one is executed. - */ - int update_posted_; - RtecEventChannelAdmin::ConsumerQOS c_qos_; - - /// The remote and the local EC, so we can reconnect when the list changes. - RtecEventChannelAdmin::EventChannel_var rmt_ec_; - RtecEventChannelAdmin::EventChannel_var lcl_ec_; - - /// Our remote RT_Infos. - RtecBase::handle_t rmt_info_; - /// Our local RT_Infos. - RtecBase::handle_t lcl_info_; - - /// Our consumer personality.... - /// If it is not 0 then we must deactivate the supplier - ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_; - int consumer_is_active_; - - /// Our supplier personality.... - /// If it is not 0 then we must deactivate the supplier - ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_; - int supplier_is_active_; - - // We use a different Consumer_Proxy - typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map; - typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator; - - /// We talk to the EC (as a supplier) using either an per-supplier - /// proxy or a generic proxy for the type only subscriptions. - Consumer_Map consumer_proxy_map_; - RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_; - - /// We talk to the EC (as a consumer) using this proxy. - RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; -}; - #include "ace/post.h" #endif /* ACE_EC_GATEWAY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp new file mode 100644 index 00000000000..dc990e5544c --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.cpp @@ -0,0 +1,477 @@ +// $Id$ + +#include "orbsvcs/Event/EC_Gateway_IIOP.h" +#include "orbsvcs/Event_Utilities.h" +#include "orbsvcs/Time_Utilities.h" + +ACE_RCSID(Event, EC_Gateway, "$Id$") + +TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void) + : busy_count_ (0), + update_posted_ (0), + rmt_info_ (0), + lcl_info_ (0), + consumer_ (this), + consumer_is_active_ (0), + supplier_ (this), + supplier_is_active_ (0) +{ +} + +TAO_EC_Gateway_IIOP::~TAO_EC_Gateway_IIOP (void) +{ +} + +int +TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, + RtecEventChannelAdmin::EventChannel_ptr lcl_ec + ACE_ENV_ARG_DECL) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + return this->init_i (rmt_ec, lcl_ec ACE_ENV_ARG_PARAMETER); +} + +int +TAO_EC_Gateway_IIOP::init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, + RtecEventChannelAdmin::EventChannel_ptr lcl_ec + ACE_ENV_ARG_DECL_NOT_USED) +{ + if (CORBA::is_nil (this->rmt_ec_.in ()) && CORBA::is_nil (this->lcl_ec_.in ())) + { + this->rmt_ec_ = + RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec); + this->lcl_ec_ = + RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); + + return 0; + } + else + ACE_ERROR_RETURN ((LM_ERROR, + "TAO_EC_Gateway_IIOP - init_i " + "Remote and local event channel reference " + "should be nil.\n"), -1); +} + +void +TAO_EC_Gateway_IIOP::close (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); + + this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER); +} + + +void +TAO_EC_Gateway_IIOP::close_i (ACE_ENV_SINGLE_ARG_DECL) +{ + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); + + if (this->consumer_proxy_map_.current_size () > 0) + { + for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); + j != this->consumer_proxy_map_.end (); + ++j) + { + RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; + if (CORBA::is_nil (consumer)) + continue; + ACE_TRY + { + consumer->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + } + ACE_ENDTRY; + CORBA::release (consumer); + } + // Remove all the elements on the map. Calling close() does not + // work because the map is left in an inconsistent state. + this->consumer_proxy_map_.open (); + } + + if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) + { + this->default_consumer_proxy_->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->default_consumer_proxy_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); + } + + if (!CORBA::is_nil (this->supplier_proxy_.in ())) + { + this->supplier_proxy_->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->supplier_proxy_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); + } +} + +void +TAO_EC_Gateway_IIOP::update_consumer ( + const RtecEventChannelAdmin::ConsumerQOS& c_qos + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (c_qos.dependencies.length () == 0) + return; + + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); + + if (this->busy_count_ != 0) + { + this->update_posted_ = 1; + this->c_qos_ = c_qos; + return; + } + + this->update_consumer_i (c_qos ACE_ENV_ARG_PARAMETER); +} + +void +TAO_EC_Gateway_IIOP::update_consumer_i ( + const RtecEventChannelAdmin::ConsumerQOS& c_qos + ACE_ENV_ARG_DECL) +{ + this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + if (CORBA::is_nil (this->lcl_ec_.in ()) + || CORBA::is_nil (this->rmt_ec_.in ())) + return; + + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n")); + + // = Connect as a supplier to the local EC + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + this->lcl_ec_->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Change the RT_Info in the consumer QoS. + // On the same loop we discover the subscriptions by event source, + // and fill the consumer proxy map. + RtecEventChannelAdmin::ConsumerQOS sub = c_qos; + sub.is_gateway = 1; + for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) + { + sub.dependencies[i].rt_info = this->rmt_info_; + + RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; + const RtecEventComm::EventHeader &h = + sub.dependencies[i].event.header; + + RtecEventComm::EventSourceID sid = h.source; + + //ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) trying (%d,%d)\n", + // sid, h.type)); + + // Skip all subscriptions that do not require an specific source + // id. + if (sid == 0) + continue; + + // Skip all the magic event types. + if (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED) + continue; + + if (this->consumer_proxy_map_.find (sid, proxy) == -1) + { + //ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) binding source %d\n", + // sid)); + proxy = supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + this->consumer_proxy_map_.bind (sid, proxy); + } + } + //ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) consumer map computed (%d entries)\n", + // this->consumer_proxy_map_.current_size ())); + + if (this->consumer_proxy_map_.current_size () > 0) + { + this->supplier_is_active_ = 1; + + // Obtain a reference to our supplier personality... + RtecEventComm::PushSupplier_var supplier_ref = + this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // For each subscription by source build the set of publications + // (they may several, by type, for instance) and connect to the + // consumer proxy. + for (Consumer_Map_Iterator j = this->consumer_proxy_map_.begin (); + j != this->consumer_proxy_map_.end (); + ++j) + { + RtecEventChannelAdmin::SupplierQOS pub; + pub.publications.length (sub.dependencies.length () + 1); + pub.is_gateway = 1; + + int c = 0; + + RtecEventComm::EventSourceID sid = (*j).ext_id_; + for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) + { + const RtecEventComm::EventHeader& h = + sub.dependencies[k].event.header; + if (h.source != sid + || (0 < h.type + && h.type < ACE_ES_EVENT_UNDEFINED)) + continue; + pub.publications[c].event.header = h; + pub.publications[c].dependency_info.dependency_type = + RtecBase::TWO_WAY_CALL; + pub.publications[c].dependency_info.number_of_calls = 1; + pub.publications[c].dependency_info.rt_info = this->lcl_info_; + c++; + } + //ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) supplier id %d has %d elements\n", + // sid, c)); + if (c == 0) + continue; + + pub.publications.length (c); + + // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Supplier ")); + // ACE_SupplierQOS_Factory::debug (pub); + (*j).int_id_->connect_push_supplier (supplier_ref.in (), + pub + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + } + + // Also build the subscriptions that are *not* by source and connect + // to the default consumer proxy. + RtecEventChannelAdmin::SupplierQOS pub; + pub.publications.length (sub.dependencies.length () - 1); + pub.is_gateway = 1; + int c = 0; + for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) + { + const RtecEventComm::EventHeader& h = + sub.dependencies[k].event.header; + RtecEventComm::EventSourceID sid = h.source; + if (sid != 0 + || (0 <= h.type + && h.type < ACE_ES_EVENT_UNDEFINED)) + continue; + pub.publications[c].event.header = h; + pub.publications[c].event.header.creation_time = ORBSVCS_Time::zero (); + pub.publications[c].dependency_info.dependency_type = + RtecBase::TWO_WAY_CALL; + pub.publications[c].dependency_info.number_of_calls = 1; + pub.publications[c].dependency_info.rt_info = this->lcl_info_; + c++; + } + + if (c > 0) + { + this->supplier_is_active_ = 1; + + // Obtain a reference to our supplier personality... + RtecEventComm::PushSupplier_var supplier_ref = + this->supplier_._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Obtain the consumer.... + this->default_consumer_proxy_ = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + pub.publications.length (c); + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); + // ACE_SupplierQOS_Factory::debug (pub); + this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (), + pub + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + this->rmt_ec_->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->consumer_is_active_ = 1; + RtecEventComm::PushConsumer_var consumer_ref = + this->consumer_._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // ACE_DEBUG ((LM_DEBUG, "ECG (%P|%t) Gateway/Consumer ")); + // ACE_ConsumerQOS_Factory::debug (sub); + + this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), + sub + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + +} + +void +TAO_EC_Gateway_IIOP::update_supplier ( + const RtecEventChannelAdmin::SupplierQOS& + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + // Do nothing... +} + +void +TAO_EC_Gateway_IIOP::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + // ACE_DEBUG ((LM_DEBUG, + // "ECG (%t): Supplier-consumer received " + // "disconnect from channel.\n")); +} + +void +TAO_EC_Gateway_IIOP::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) +{ + // ACE_DEBUG ((LM_DEBUG, + // "ECG (%t): Supplier received " + // "disconnect from channel.\n")); +} + +void +TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events + ACE_ENV_ARG_DECL) +{ + // ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push (%P|%t) - \n")); + + if (events.length () == 0) + { + // ACE_DEBUG ((LM_DEBUG, "no events\n")); + return; + } + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); + + this->busy_count_++; + } + + // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ())); + + // @@ TODO, there is an extra data copy here, we should do the event + // modification without it and only compact the necessary events. + RtecEventComm::EventSet out (1); + out.length (1); + for (CORBA::ULong i = 0; i < events.length (); ++i) + { + if (events[i].header.ttl == 0) + continue; + + RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; + RtecEventComm::EventSourceID sid = events[i].header.source; + if (sid == 0 + || this->consumer_proxy_map_.find (sid, proxy) == -1) + { + // If the source is not in our map we have to use the + // default consumer proxy. + proxy = this->default_consumer_proxy_.in (); + } + + if (CORBA::is_nil (proxy)) + continue; + + out[0] = events[i]; + out[0].header.ttl--; + + // ACE_DEBUG ((LM_DEBUG, "ECG: event sent to proxy\n")); + proxy->push (out ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + { + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); + + this->busy_count_--; + + if (this->busy_count_ == 0 && this->update_posted_ != 0) + { + this->update_posted_ = 0; + this->update_consumer_i (this->c_qos_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + } + +} + +int +TAO_EC_Gateway_IIOP::shutdown (ACE_ENV_SINGLE_ARG_DECL) +{ + ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + this->close_i (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + if (this->supplier_is_active_) + { + PortableServer::POA_var poa = + this->supplier_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->supplier_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + this->supplier_is_active_ = 0; + } + + if (this->consumer_is_active_) + { + PortableServer::POA_var poa = + this->consumer_._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->consumer_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + this->consumer_is_active_ = 0; + } + + this->lcl_ec_ = + RtecEventChannelAdmin::EventChannel::_nil (); + this->rmt_ec_ = + RtecEventChannelAdmin::EventChannel::_nil (); + + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP>; +template class ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP>; +template class ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr>; +template class ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; +template class ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; +template class ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; +template class ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> +#pragma instantiate ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> + +#pragma instantiate ACE_Map_Entry<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr> +#pragma instantiate ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> +#pragma instantiate ACE_Map_Iterator_Base<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> +#pragma instantiate ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> +#pragma instantiate ACE_Map_Reverse_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h new file mode 100644 index 00000000000..4877d0fb8f4 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_IIOP.h @@ -0,0 +1,157 @@ +/* -*- C++ -*- */ +/** + * @file EC_Gateway_IIOP.h + * + * $Id$ + * + * @author Carlos O'Ryan (coryan@cs.wustl.edu) + * + * Based on previous work by Tim Harrison (harrison@cs.wustl.edu) and + * other members of the DOC group. More details can be found in: + * + * http://doc.ece.uci.edu/~coryan/EC/index.html + */ + +#ifndef TAO_EC_GATEWAY_IIOP_H +#define TAO_EC_GATEWAY_IIOP_H +#include "ace/pre.h" + +#include "orbsvcs/Event/event_export.h" +#include "orbsvcs/Event/EC_Gateway.h" +#include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/Channel_Clients.h" +#include "ace/Map_Manager.h" + +/** + * @class TAO_EC_Gateway_IIOP + * + * @brief Event Channel Gateway using IIOP. + * + * This class mediates among two event channels, it connects as a + * consumer of events with a remote event channel, and as a supplier + * of events with the local EC. + * As a consumer it gives a QoS designed to only accept the events + * in which *local* consumers are interested. + * Eventually the local EC should create this object and compute its + * QoS in an automated manner; but this requires some way to filter + * out the peers registered as consumers, otherwise we will get + * loops in the QoS graph. + * It uses exactly the same set of events in the publications list + * when connected as a supplier. + * + * @note + * An alternative implementation would be to register with the + * remote EC as a supplier, and then filter on the remote EC, but + * one of the objectives is to minimize network traffic. + * On the other hand the events will be pushed to remote consumers, + * event though they will be dropped upon receipt (due to the TTL + * field); IMHO this is another suggestion that the EC needs to know + * (somehow) which consumers are truly its peers in disguise. + * + * @todo: The class makes an extra copy of the events, we need to + * investigate if closer collaboration with its collocated EC could + * be used to remove that copy. + */ +class TAO_RTEvent_Export TAO_EC_Gateway_IIOP : public TAO_EC_Gateway +{ +public: + TAO_EC_Gateway_IIOP (void); + ~TAO_EC_Gateway_IIOP (void); + + /** + * To do its job this class requires to know the local and remote ECs it will + * connect to. + * @return 0 in case of success, -1 in case of failure + */ + int init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, + RtecEventChannelAdmin::EventChannel_ptr lcl_ec + ACE_ENV_ARG_DECL); + + /// The channel is disconnecting. + void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + + /// The channel is disconnecting. + void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + + /// This is the Consumer side behavior, it pushes the events to the + /// local event channel. + void push (const RtecEventComm::EventSet &events + ACE_ENV_ARG_DECL_WITH_DEFAULTS); + + /// Disconnect and shutdown the gateway + int shutdown (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + + // The following methods are documented in the base class. + virtual void close (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + virtual void update_consumer (const RtecEventChannelAdmin::ConsumerQOS& sub + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void update_supplier (const RtecEventChannelAdmin::SupplierQOS& pub + ACE_ENV_ARG_DECL_WITH_DEFAULTS) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + void close_i (ACE_ENV_SINGLE_ARG_DECL_NOT_USED); + + void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub + ACE_ENV_ARG_DECL); + +protected: + /// Do the real work in init() + int init_i (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, + RtecEventChannelAdmin::EventChannel_ptr lcl_ec + ACE_ENV_ARG_DECL); + +protected: + /// Lock to synchronize internal changes + TAO_SYNCH_MUTEX lock_; + + /// How many threads are running push() we cannot make changes until + /// that reaches 0 + CORBA::ULong busy_count_; + + /** + * An update_consumer() message arrived *while* we were doing a + * push() the modification is stored <pub_>, if multiple + * update_consumer messages arrive only the last one is executed. + */ + int update_posted_; + RtecEventChannelAdmin::ConsumerQOS c_qos_; + + /// The remote and the local EC, so we can reconnect when the list changes. + RtecEventChannelAdmin::EventChannel_var rmt_ec_; + RtecEventChannelAdmin::EventChannel_var lcl_ec_; + + /// Our remote RT_Infos. + RtecBase::handle_t rmt_info_; + /// Our local RT_Infos. + RtecBase::handle_t lcl_info_; + + /// Our consumer personality.... + ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_; + + /// If it is not 0 then we must deactivate the consumer + int consumer_is_active_; + + /// Our supplier personality.... + ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_; + + /// If it is not 0 then we must deactivate the supplier + int supplier_is_active_; + + // We use a different Consumer_Proxy + typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map; + typedef ACE_Map_Iterator<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map_Iterator; + + /// We talk to the EC (as a supplier) using either an per-supplier + /// proxy or a generic proxy for the type only subscriptions. + Consumer_Map consumer_proxy_map_; + RtecEventChannelAdmin::ProxyPushConsumer_var default_consumer_proxy_; + + /// We talk to the EC (as a consumer) using this proxy. + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; +}; + +#include "ace/post.h" +#endif /* ACE_EC_GATEWAY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp index 4a824e6f418..dccdf792bd7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.cpp @@ -25,16 +25,13 @@ activate (T & obj_ref, suggested_object_deactivator.set_values (poa, obj_id.in ()); - // Get the object reference of the activated object. CORBA::Object_var obj = poa->id_to_reference (obj_id.in () ACE_ENV_ARG_PARAMETER); ACE_CHECK; obj_ref = - my_narrow_until_carlos_gets_jeff_to_fix_the_idl_compiler (obj_ref.ptr (), - obj.in () - ACE_ENV_ARG_PARAMETER); + T::_obj_type::_narrow (obj.in() ACE_ENV_ARG_PARAMETER); ACE_CHECK; if (CORBA::is_nil (obj_ref.in ())) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h index df358c02550..0aeff3e86d7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.h @@ -24,9 +24,9 @@ /** * @brief Helper for activating objects. - * Activates <servant> with <poa> and returns the object reference via - * <obj_ref>. If <object_deactivator> != 0, it is populated with info - * necessary to deactivate the <servant> from <poa> + * Activates @a servant with @a poa and returns the object reference via + * @a obj_ref. If @a object_deactivator != 0, it is populated with info + * necessary to deactivate the @a servant from @a poa. */ template <typename T> void activate (T & obj_ref, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i index c206e93746e..bb025cae6d2 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Lifetime_Utils_T.i @@ -1,19 +1,5 @@ // $Id$ -// Helper. -template <typename T> -ACE_INLINE typename T::_var_type -my_narrow_until_carlos_gets_jeff_to_fix_the_idl_compiler ( - T *, - CORBA::Object_ptr obj - ACE_ENV_ARG_DECL) -{ - return T::_narrow (obj ACE_ENV_ARG_PARAMETER); -} - - -//*************************************************************************** - template <class T> ACE_INLINE TAO_EC_Auto_Command<T>::TAO_EC_Auto_Command (void) diff --git a/TAO/orbsvcs/orbsvcs/Makefile.RTEvent b/TAO/orbsvcs/orbsvcs/Makefile.RTEvent index e47a774c008..d966f52234e 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile.RTEvent +++ b/TAO/orbsvcs/orbsvcs/Makefile.RTEvent @@ -90,6 +90,7 @@ CPP_SRCS += \ CPP_SRCS += \ Event_Utilities \ Event/EC_Gateway \ + Event/EC_Gateway_IIOP \ Event/ECG_UDP_Out_Endpoint \ Event/ECG_UDP_Sender \ Event/EC_UDP_Admin \ @@ -103,7 +104,7 @@ CPP_SRCS += \ Event/ECG_UDP_EH \ Event/ECG_Mcast_EH \ Event/ECG_UDP_Receiver \ - Event/ECG_CDR_Message_Sender + Event/ECG_CDR_Message_Sender IDL_SRC = \ $(addsuffix S.cpp, $(IDL_FILES)) \ diff --git a/TAO/orbsvcs/orbsvcs/RTEvent.bor b/TAO/orbsvcs/orbsvcs/RTEvent.bor index 422f1d611ff..d84a5bdf8bf 100644 --- a/TAO/orbsvcs/orbsvcs/RTEvent.bor +++ b/TAO/orbsvcs/orbsvcs/RTEvent.bor @@ -43,6 +43,7 @@ OBJFILES = \ $(OBJDIR)\EC_Filter_Builder.obj \ $(OBJDIR)\EC_Group_Scheduling.obj \ $(OBJDIR)\EC_Gateway.obj \ + $(OBJDIR)\EC_Gateway_IIOP.obj \ $(OBJDIR)\ECG_UDP_Out_Endpoint.obj \ $(OBJDIR)\ECG_Mcast_EH.obj \ $(OBJDIR)\ECG_UDP_Sender.obj \ diff --git a/TAO/orbsvcs/orbsvcs/RTEvent.dsp b/TAO/orbsvcs/orbsvcs/RTEvent.dsp index 396d65d7aa4..e2c320d97b2 100644 --- a/TAO/orbsvcs/orbsvcs/RTEvent.dsp +++ b/TAO/orbsvcs/orbsvcs/RTEvent.dsp @@ -244,6 +244,10 @@ SOURCE=.\Event\EC_Gateway.cpp # End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Group_Scheduling.cpp
# End Source File
# Begin Source File
@@ -541,6 +545,10 @@ SOURCE=.\Event\EC_Gateway.h # End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Group_Scheduling.h
# End Source File
# Begin Source File
diff --git a/TAO/orbsvcs/orbsvcs/RTEvent.mpc b/TAO/orbsvcs/orbsvcs/RTEvent.mpc index 05a53bdb3ec..c1c8875b0a6 100644 --- a/TAO/orbsvcs/orbsvcs/RTEvent.mpc +++ b/TAO/orbsvcs/orbsvcs/RTEvent.mpc @@ -13,7 +13,7 @@ project(RTEvent) : orbsvcslib, core, svc_utils, messaging, portableserver { RtecUDPAdmin.idl } - // This could be simplified if the cpp files for RTEvent were + // This could be simplified if the cpp files for RTEvent were // in their own directory. Source_Files(ORBSVCS_COMPONENTS) { RTEvent { @@ -61,6 +61,7 @@ project(RTEvent) : orbsvcslib, core, svc_utils, messaging, portableserver { Event/EC_Filter.cpp Event/EC_Filter_Builder.cpp Event/EC_Gateway.cpp + Event/EC_Gateway_IIOP.cpp Event/EC_Group_Scheduling.cpp Event/EC_Lifetime_Utils.cpp Event/EC_Lifetime_Utils_T.cpp diff --git a/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp b/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp index 390d8eaa3b5..b207e50615a 100644 --- a/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp +++ b/TAO/orbsvcs/orbsvcs/RTEvent_Static.dsp @@ -783,6 +783,10 @@ SOURCE=.\Event\EC_Filter_Builder.cpp # End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Gateway.cpp
# End Source File
# Begin Source File
@@ -1083,6 +1087,10 @@ SOURCE=.\Event\EC_Gateway.h # End Source File
# Begin Source File
+SOURCE=.\Event\EC_Gateway_IIOP.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Event\EC_Group_Scheduling.h
# End Source File
# Begin Source File
diff --git a/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp b/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp index 95cba52845f..149b02bea9e 100644 --- a/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp +++ b/TAO/orbsvcs/performance-tests/RTEvent/lib/Peer_Base.cpp @@ -16,7 +16,7 @@ #include "Loopback.h" #include "orbsvcs/Event/EC_Event_Channel.h" -#include "orbsvcs/Event/EC_Gateway.h" +#include "orbsvcs/Event/EC_Gateway_IIOP.h" ACE_RCSID (TAO_RTEC_Perf, Peer_Base, "$Id$") diff --git a/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile b/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile index 0a3b38d1722..a2c926e221d 100644 --- a/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile +++ b/TAO/orbsvcs/tests/Bug_1393_Regression/Makefile @@ -8,8 +8,4 @@ include $(ACE_ROOT)/include/makeinclude/macros.GNU TARGETS_NESTED := $(TARGETS_NESTED:.nested=) $(TARGETS_NESTED): -ifneq (Windows,$(findstring Windows,$(OS))) @$(MAKE) -f Makefile.Bug_1393_Client -C . $(@); -else - -@cmd /c "$(MAKE) -f Makefile.Bug_1393_Client -C . $(@)" -endif diff --git a/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile b/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile index 6abffaea28d..69b4f1a1da0 100644 --- a/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile +++ b/TAO/orbsvcs/tests/Bug_1395_Regression/Makefile @@ -11,11 +11,6 @@ MFILES = \ Makefile.Bug_1395_Client $(TARGETS_NESTED): -ifneq (Windows,$(findstring Windows,$(OS))) @for file in $(MFILES); do \ $(MAKE) -f `basename $$file` -C `dirname $$file` $(@); \ done -else - -@cmd /c "$(MAKE) -f Makefile.Bug_1395_Server -C . $(@)" - -@cmd /c "$(MAKE) -f Makefile.Bug_1395_Client -C . $(@)" -endif diff --git a/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp b/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp index 527d1c124e2..25823fbe628 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp +++ b/TAO/orbsvcs/tests/Event/Basic/Gateway.cpp @@ -6,7 +6,7 @@ #include "orbsvcs/Event_Utilities.h" #include "orbsvcs/Event/EC_Event_Channel.h" #include "orbsvcs/Event/EC_Default_Factory.h" -#include "orbsvcs/Event/EC_Gateway.h" +#include "orbsvcs/Event/EC_Gateway_IIOP.h" ACE_RCSID(EC_Tests, Gateway, "$Id$") diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.h b/TAO/orbsvcs/tests/Event/Basic/Observer.h index 7c562cfab60..4ff6fd35604 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Observer.h +++ b/TAO/orbsvcs/tests/Event/Basic/Observer.h @@ -18,7 +18,7 @@ #define EC_OBSERVER_H #include "Driver.h" -#include "orbsvcs/Event/EC_Gateway.h" +#include "orbsvcs/Event/EC_Gateway_IIOP.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once diff --git a/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor b/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor index fed954b1599..11ffedd9f3e 100644 --- a/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor +++ b/TAO/orbsvcs/tests/Notify/Basic/Makefile.bor @@ -28,6 +28,7 @@ CFLAGS = \ $(TAO_NOTIFYTESTS_CFLAGS) \ $(TAO_NOTIFY_CFLAGS) \ $(TAO_ETCL_CFLAGS) \ + $(TAO_DYNAMICANY_CFLAGS) \ $(TAO_MESSAGING_CFLAGS) CPPDIR = . @@ -43,6 +44,7 @@ LIBFILES = \ $(TAO_NOTIFYTESTS_LIB) \ $(TAO_NOTIFY_LIB) \ $(TAO_ETCL_LIB) \ + $(TAO_DYNAMICANY_LIB) \ $(TAO_MESSAGING_LIB) !include <$(ACE_ROOT)\include\makeinclude\recurse.bor> |