// $Id$ #include "EC_ProxyConsumer.h" #include "EC_Event_Channel.h" #include "EC_SupplierFiltering.h" #include "EC_Supplier_Filter_Builder.h" #if ! defined (__ACE_INLINE__) #include "EC_ProxyConsumer.i" #endif /* __ACE_INLINE__ */ ACE_RCSID(Event, EC_ProxyConsumer, "$Id$") typedef ACE_Reverse_Lock TAO_EC_Unlock; TAO_EC_ProxyPushConsumer:: TAO_EC_ProxyPushConsumer (TAO_EC_Event_Channel* ec) : event_channel_ (ec), refcount_ (1), filter_ (0) { this->lock_ = this->event_channel_->create_consumer_lock (); this->default_POA_ = this->event_channel_->consumer_poa (); } TAO_EC_ProxyPushConsumer::~TAO_EC_ProxyPushConsumer (void) { this->event_channel_->destroy_consumer_lock (this->lock_); } void TAO_EC_ProxyPushConsumer::connected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { TAO_EC_SupplierFiltering* filter = 0; { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; if (this->is_connected_i () == 0) return; filter = this->filter_; filter->_incr_refcnt (); } filter->connected (supplier, ACE_TRY_ENV); { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; filter->_decr_refcnt (); } } void TAO_EC_ProxyPushConsumer::disconnected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { TAO_EC_SupplierFiltering* filter = 0; { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; if (this->is_connected_i () == 0) return; filter = this->filter_; filter->_incr_refcnt (); } filter->disconnected (supplier, ACE_TRY_ENV); { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; filter->_decr_refcnt (); } } void TAO_EC_ProxyPushConsumer::connected (TAO_EC_ProxyPushConsumer*, CORBA::Environment &) { } void TAO_EC_ProxyPushConsumer::disconnected (TAO_EC_ProxyPushConsumer*, CORBA::Environment &) { } void TAO_EC_ProxyPushConsumer::shutdown (CORBA::Environment &ACE_TRY_ENV) { RtecEventComm::PushSupplier_var supplier; { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; if (this->is_connected_i () == 0) return; supplier = this->supplier_._retn (); this->filter_->shutdown (ACE_TRY_ENV); ACE_CHECK; this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; supplier->disconnect_push_supplier (ACE_TRY_ENV); } void TAO_EC_ProxyPushConsumer::cleanup_i (void) { this->supplier_ = RtecEventComm::PushSupplier::_nil (); this->filter_->unbind (this); this->filter_->_decr_refcnt (); this->filter_ = 0; } void TAO_EC_ProxyPushConsumer::deactivate (CORBA::Environment &ACE_TRY_ENV) { PortableServer::POA_var poa = this->_default_POA (ACE_TRY_ENV); ACE_CHECK; PortableServer::ObjectId_var id = poa->servant_to_id (this, ACE_TRY_ENV); ACE_CHECK; poa->deactivate_object (id.in (), ACE_TRY_ENV); ACE_CHECK; } CORBA::ULong TAO_EC_ProxyPushConsumer::_incr_refcnt (void) { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0); return this->refcount_++; } CORBA::ULong TAO_EC_ProxyPushConsumer::_decr_refcnt (void) { { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0); this->refcount_--; if (this->refcount_ != 0) return this->refcount_; } // Notify the event channel this->event_channel_->destroy_proxy_push_consumer (this); return 0; } void TAO_EC_ProxyPushConsumer::connect_push_supplier ( RtecEventComm::PushSupplier_ptr push_supplier, const RtecEventChannelAdmin::SupplierQOS& qos, CORBA::Environment &ACE_TRY_ENV) { { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; if (this->is_connected_i ()) { if (this->event_channel_->supplier_reconnect () == 0) ACE_THROW (RtecEventChannelAdmin::AlreadyConnected ()); // Re-connections are allowed, go ahead and disconnect the // consumer... this->cleanup_i (); // @@ Please read the comments in EC_ProxySuppliers about // possible race conditions in this area... TAO_EC_Unlock reverse_lock (*this->lock_); { ACE_GUARD_THROW_EX ( TAO_EC_Unlock, ace_mon, reverse_lock, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; this->event_channel_->disconnected (this, ACE_TRY_ENV); ACE_CHECK; } // What if a second thread connected us after this? if (this->is_connected_i ()) return; } this->supplier_ = RtecEventComm::PushSupplier::_duplicate (push_supplier); this->qos_ = qos; this->filter_ = this->event_channel_->supplier_filter_builder ()->create (this->qos_); this->filter_->bind (this); } // Notify the event channel... this->event_channel_->connected (this, ACE_TRY_ENV); } void TAO_EC_ProxyPushConsumer::push (const RtecEventComm::EventSet& event, CORBA::Environment &ACE_TRY_ENV) { TAO_EC_SupplierFiltering* filter = 0; { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; if (this->is_connected_i () == 0) return; // @@ THROW something??? filter = this->filter_; filter->_incr_refcnt (); this->refcount_++; } // No need to keep the lock, the filter_ class is supposed to be // thread safe.... filter->push (event, ACE_TRY_ENV); { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; filter->_decr_refcnt (); this->refcount_--; if (this->refcount_ != 0) return; } this->event_channel_->destroy_proxy_push_consumer (this); } void TAO_EC_ProxyPushConsumer::disconnect_push_consumer ( CORBA::Environment &ACE_TRY_ENV) { { ACE_GUARD_THROW_EX ( ACE_Lock, ace_mon, *this->lock_, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK; if (this->is_connected_i () == 0) ACE_THROW (CORBA::BAD_INV_ORDER ()); // @@ add user exception? this->cleanup_i (); } this->deactivate (ACE_TRY_ENV); ACE_CHECK; // Notify the event channel... this->event_channel_->disconnected (this, ACE_TRY_ENV); this->_decr_refcnt (); } PortableServer::POA_ptr TAO_EC_ProxyPushConsumer::_default_POA (CORBA::Environment&) { return PortableServer::POA::_duplicate (this->default_POA_.in ()); } void TAO_EC_ProxyPushConsumer::_add_ref (CORBA::Environment &) { this->_incr_refcnt (); } void TAO_EC_ProxyPushConsumer::_remove_ref (CORBA::Environment &) { this->_decr_refcnt (); }