From 49fbf9581e5ec613d80ae222d6dd5eecede6d248 Mon Sep 17 00:00:00 2001 From: coryan Date: Sun, 4 Oct 1998 21:54:56 +0000 Subject: ChangeLogTag:Sun Oct 4 16:37:23 1998 Carlos O'Ryan --- TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp | 359 +++++++++++++--------------- TAO/orbsvcs/orbsvcs/Event/Event_Channel.h | 7 +- 2 files changed, 170 insertions(+), 196 deletions(-) (limited to 'TAO/orbsvcs/orbsvcs') diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp index ee1aee6d32a..ec8c4f21fb5 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp @@ -1,6 +1,7 @@ // $Id$ #include "ace/Service_Config.h" +#include "ace/Auto_Ptr.h" #include "orbsvcs/Scheduler_Factory.h" #include "orbsvcs/Event_Utilities.h" @@ -89,37 +90,22 @@ ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Event_Channel_Timeprobe_Description, // ************************************************************ -static RtecScheduler::OS_Priority -Preemption_Priority (RtecScheduler::handle_t rtinfo) +static RtecScheduler::Preemption_Priority +Preemption_Priority (RtecScheduler::handle_t rtinfo, + CORBA::Environment &_env) { RtecScheduler::OS_Priority thread_priority; RtecScheduler::Preemption_Subpriority subpriority; RtecScheduler::Preemption_Priority preemption_priority; - TAO_TRY - { - ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED); - ACE_Scheduler_Factory::server ()->priority - (rtinfo, - thread_priority, - subpriority, - preemption_priority, - TAO_TRY_ENV); - TAO_CHECK_ENV - ACE_TIMEPROBE (TAO_EVENT_CHANNEL_CONNECTED_PRIORITY_OBTAINED); - } - TAO_CATCH (RtecScheduler::UNKNOWN_TASK, ex_ut) - { - ACE_ERROR_RETURN ((LM_ERROR, "UNKNOWN_TASK %p.\n", - "Preemption_Priority"), 0); - } - TAO_CATCHANY - { - ACE_ERROR_RETURN ((LM_ERROR, "Unexpected exception %p.\n", - "Preemption_Priority"), 0); - - } - TAO_ENDTRY; + ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED); + ACE_Scheduler_Factory::server ()->priority + (rtinfo, + thread_priority, + subpriority, + preemption_priority, + _env); + TAO_CHECK_ENV_RETURN (_env, 0); return preemption_priority; } @@ -163,17 +149,28 @@ public: // This is so the dispatching module can query us as a dispatch // request to get the appropriate preemption priority. ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies); + CORBA::Environment env; + RtecScheduler::Preemption_Priority p = + ACE_Scheduler_MIN_PREEMPTION_PRIORITY; while (iter.advance_dependency () == 0) { RtecEventComm::EventType &type = (*iter).event.header.type; + if (type != ACE_ES_GLOBAL_DESIGNATOR && type != ACE_ES_CONJUNCTION_DESIGNATOR && type != ACE_ES_DISJUNCTION_DESIGNATOR) { - if (rt_info_ == 0 || - ::Preemption_Priority ((*iter).rt_info) < - ::Preemption_Priority (rt_info_)) - rt_info_ = (*iter).rt_info; + env.clear (); + RtecScheduler::Preemption_Priority q = + ::Preemption_Priority ((*iter).rt_info, env); + if (env.exception () != 0) + continue; + if (rt_info_ == 0) + { + this->rt_info_ = ((*iter).rt_info); + p = q; + continue; + } } } } @@ -436,22 +433,14 @@ ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events, { ACE_DEBUG ((LM_DEBUG, "EC (%t) Push to disconnected consumer %s\n", - ::ACE_ES_Consumer_Name (this->qos ()))); + ::ACE_ES_Consumer_Name (this->qos (), + _env))); // ACE_ES_DEBUG_ST (::dump_sequence (events)); return; } - TAO_TRY - { - push_consumer_->push (events, TAO_TRY_ENV); - TAO_CHECK_ENV; - } - TAO_CATCH (CORBA::SystemException, se) - { - ACE_ERROR ((LM_ERROR, "system exception.\n")); - TAO_RETHROW; - } - TAO_ENDTRY; + push_consumer_->push (events, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } void @@ -942,8 +931,12 @@ ACE_ES_Subscription_Info::remove (SourceID_Map &source_subscribers, // @@ Should probably remove the supplier from the consumers caller // list. - // If the set is empty, remove it from the type collection. #if 0 + // If the set is empty, remove it from the type collection. + // NOT!!!! In some cases the map is initialized to the types that a + // certain supplier export; removing an entry from the map renders + // that supplier unable to send that event type. + // Before changing this ask me (coryan). if (subscribers->size () == 0) { Subscriber_Set *removed_subscribers; @@ -1121,6 +1114,17 @@ ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) // sc->consumer ())); CORBA::Boolean dont_update = sc->consumer ()->qos ().is_gateway; + + // Deactivate the consumer proxy + PortableServer::POA_var poa = + sc->consumer ()->_default_POA (TAO_TRY_ENV); + TAO_CHECK_ENV; + PortableServer::ObjectId_var id = + poa->servant_to_id (sc->consumer (), TAO_TRY_ENV); + TAO_CHECK_ENV; + poa->deactivate_object (id.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + // Delete the consumer proxy. delete sc->consumer (); @@ -1178,9 +1182,18 @@ ACE_ES_Consumer_Module::shutdown (void) iter.advance ()) { (*proxy)->shutdown (); - // @@ Cannnot use CORBA::release (*proxy), since it is a servant. - delete *proxy; - + // @@ Cannnot use CORBA::release (*proxy), since it is a + // servant. + // Deactivate the proxy... + PortableServer::POA_var poa = + (*proxy)->_default_POA (env); + TAO_CHECK_ENV_RETURN_VOID (env); + PortableServer::ObjectId_var id = + poa->servant_to_id (*proxy, env); + TAO_CHECK_ENV_RETURN_VOID (env); + poa->deactivate_object (id.in (), env); + TAO_CHECK_ENV_RETURN_VOID (env); + // Remove the consumer from our list. { ACE_ES_GUARD ace_mon (lock_); @@ -1190,6 +1203,8 @@ ACE_ES_Consumer_Module::shutdown (void) if (all_consumers_.remove (*proxy) == -1) ACE_ERROR ((LM_ERROR, "%p Failed to remove consumer.\n", "ACE_ES_Consumer_Module::shutdown")); } + + delete *proxy; } } @@ -1283,10 +1298,11 @@ ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request, RtecEventChannelAdmin::ProxyPushSupplier_ptr ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &_env) { - ACE_Push_Consumer_Proxy *new_consumer = new ACE_Push_Consumer_Proxy (this); + auto_ptr new_consumer = + new ACE_Push_Consumer_Proxy (this); // Get a new supplier proxy object. - if (new_consumer == 0) + if (new_consumer.get () == 0) { ACE_ERROR ((LM_ERROR, "ACE_EventChannel" "::obtain_push_supplier failed.\n")); @@ -1294,19 +1310,16 @@ ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &_env) } { - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - { - delete new_consumer; - TAO_THROW_RETURN (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR(), 0); - } + TAO_GUARD_THROW_RETURN (ACE_ES_MUTEX, ace_mon, this->lock_, 0, _env, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - if (all_consumers_.insert (new_consumer) == -1) + if (all_consumers_.insert (new_consumer.get ()) == -1) ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Module insert failed.\n")); } - // Return the CORBA object reference to the new supplier proxy. - return new_consumer->get_ref (_env); + // Return the CORBA object reference to the new supplier proxy, + // there is no need to hold a pointer, it is now help in the map... + return new_consumer.release ()->get_ref (_env); } void @@ -1424,7 +1437,8 @@ ACE_ES_Correlation_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, CORBA::Environment &) { if (consumer->correlation ().disconnecting () == -1) - ACE_ERROR ((LM_ERROR, "ACE_ES_Correlation_Module::disconnecting failed.\n")); + ACE_ERROR ((LM_ERROR, + "ACE_ES_Correlation_Module::disconnecting failed.\n")); } int @@ -2555,82 +2569,71 @@ ACE_ES_Subscription_Module::subscribe_source (ACE_ES_Consumer_Rep *consumer, // -supplier-. Add the -consumer- to the correct supplier proxy. Supplier_Iterator iter (all_suppliers_); - int success = -1; - for (ACE_Push_Supplier_Proxy **proxy = 0; iter.next (proxy) != 0; iter.advance ()) { // Operator == checks if is a proxy for . - if ((**proxy) == source) - { - ACE_ES_WGUARD mon ((*proxy)->subscription_info ().lock_); + if (!((**proxy) == source)) + continue; - ACE_ES_Subscription_Info::Subscriber_Set &set = - (*proxy)->subscription_info ().source_subscribers_; + ACE_ES_WGUARD mon ((*proxy)->subscription_info ().lock_); - // Insert the consumer to the supplier's subscription set for - // the type. - int insert_result = set.insert (consumer); - switch (insert_result) - { - case -1: - // Error. - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "Subscription Module::subscribe_source"), -1); - case 0: - default: + ACE_ES_Subscription_Info::Subscriber_Set &set = + (*proxy)->subscription_info ().source_subscribers_; + + // Insert the consumer to the supplier's subscription set for + // the type. + int insert_result = set.insert (consumer); + switch (insert_result) + { + case -1: + // Error. + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "Subscription Module::subscribe_source"), -1); + case 1: + // Already there. + break; + case 0: + default: + { + // Increment the consumer rep's reference count. + consumer->_duplicate (); + + // Add each of the supplier's dependency infos to the + // consumer's dependency list. + ACE_ES_Subscription_Info::Subscriber_Map_Iterator iter2 + ((*proxy)->subscription_info ().type_subscribers_); + + // Delete all type collections. + for (ACE_ES_Subscription_Info::Subscriber_Map_Entry *temp = 0; + iter2.next (temp) != 0; + iter2.advance ()) { - // Increment the consumer rep's reference count. - consumer->_duplicate (); - // Success. - // Add each of the supplier's dependency infos to the - // consumer's dependency list. - ACE_ES_Subscription_Info::Subscriber_Map_Iterator iter2 - ((*proxy)->subscription_info ().type_subscribers_); - - // Delete all type collections. - for (ACE_ES_Subscription_Info::Subscriber_Map_Entry *temp = 0; - iter2.next (temp) != 0; - iter2.advance ()) + TAO_TRY { - TAO_TRY - { - ACE_Scheduler_Factory::server()->add_dependency - (consumer->dependency()->rt_info, - temp->int_id_->dependency_info_->rt_info, - temp->int_id_->dependency_info_->number_of_calls, - RtecScheduler::ONE_WAY_CALL, - TAO_TRY_ENV); - TAO_CHECK_ENV; - - } - TAO_CATCHANY - { - TAO_TRY_ENV.print_exception ("error adding dependency"); - return -1; - } - TAO_ENDTRY; + ACE_Scheduler_Factory::server()->add_dependency + (consumer->dependency()->rt_info, + temp->int_id_->dependency_info_->rt_info, + temp->int_id_->dependency_info_->number_of_calls, + RtecScheduler::ONE_WAY_CALL, + TAO_TRY_ENV); + TAO_CHECK_ENV; } + TAO_CATCHANY + { + TAO_TRY_ENV.print_exception ("error adding dependency"); + return -1; + } + TAO_ENDTRY; } - - case 1: - // Already there. - success = 0; - break; - } + } } } - // Add the consumer to the global source subscribers list. - // @@ TODO This seems to require that source IDs be unique, i.e. any - // new supplier with the same ID will be ignored. - if (success == -1) - return ACE_ES_Subscription_Info::insert_or_allocate (source_subscribers_, - consumer, - source); - else - return success; + return ACE_ES_Subscription_Info::insert_or_allocate (source_subscribers_, + consumer, + source); } // Step through all Supplier Proxies. For each proxy, if it generates @@ -3126,24 +3129,21 @@ ACE_ES_Supplier_Module::shutdown (void) RtecEventChannelAdmin::ProxyPushConsumer_ptr ACE_ES_Supplier_Module::obtain_push_consumer (CORBA::Environment &_env) { - ACE_Push_Supplier_Proxy *new_supplier = new ACE_Push_Supplier_Proxy (this); + auto_ptr new_supplier = + new ACE_Push_Supplier_Proxy (this); - if (new_supplier == 0) + if (new_supplier.get () == 0) TAO_THROW_RETURN (CORBA::NO_MEMORY (CORBA::COMPLETED_NO), 0); { - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - { - delete new_supplier; - TAO_THROW_RETURN (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR(), 0); - } + TAO_GUARD_THROW_RETURN (ACE_ES_MUTEX, ace_mon, this->lock_, 0, _env, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - if (all_suppliers_.insert (new_supplier) == -1) + if (all_suppliers_.insert (new_supplier.get ()) == -1) ACE_ERROR ((LM_ERROR, "ACE_ES_Supplier_Module insert failed.\n")); } - return new_supplier->get_ref (_env); + return new_supplier.release ()->get_ref (_env); } void @@ -3152,51 +3152,23 @@ ACE_ES_Supplier_Module::push (ACE_Push_Supplier_Proxy *proxy, CORBA::Environment &_env) { // ACE_DEBUG ((LM_DEBUG, "EC (%t) Supplier_Module::push\n")); - TAO_TRY - { - for (CORBA::ULong i = 0; i < event.length(); ++i) - { - ACE_ES_Event_Container *temp = - new ACE_ES_Event_Container (event[i]); - //RtecEventComm::Event *temp = new RtecEventComm::Event (event); - - if (temp == 0) - TAO_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO)); - - // This will guarantee that release gets called when we exit - // the scope. - ACE_ES_Event_Container_var event_copy (temp); - temp->_release (); - ACE_TIMEPROBE (TAO_EVENT_CHANNEL_DELIVER_TO_SUPPLIER_MODULE_THRU_SUPPLIER_PROXY); - up_->push (proxy, event_copy, TAO_TRY_ENV); - TAO_CHECK_ENV; - } - } - TAO_CATCH (RtecEventChannelAdmin::TypeError, t) + for (CORBA::ULong i = 0; i < event.length(); ++i) { - ACE_ERROR ((LM_ERROR, "%p Type Error.\n", - "ACE_ES_Supplier_Module::push")); - TAO_RETHROW; - } - TAO_CATCH (CORBA::NO_MEMORY, e) - { - ACE_ERROR ((LM_ERROR, "%p No Memory.\n", - "ACE_ES_Supplier_Module::push")); - TAO_RETHROW; - } - TAO_CATCH (CORBA::SystemException, e) - { - ACE_ERROR ((LM_ERROR, "%p CORBA System Exception.\n", - "ACE_ES_Supplier_Module::push")); - TAO_RETHROW; - } - TAO_CATCHANY - { - ACE_ERROR ((LM_ERROR, "ACE_ES_Supplier_Module::push: " - "Unknown exception.\n")); - TAO_RETHROW; + ACE_ES_Event_Container *temp = + new ACE_ES_Event_Container (event[i]); + //RtecEventComm::Event *temp = new RtecEventComm::Event (event); + + if (temp == 0) + TAO_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO)); + + // This will guarantee that release gets called when we exit + // the scope. + ACE_ES_Event_Container_var event_copy (temp); + temp->_release (); + ACE_TIMEPROBE (TAO_EVENT_CHANNEL_DELIVER_TO_SUPPLIER_MODULE_THRU_SUPPLIER_PROXY); + up_->push (proxy, event_copy, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } - TAO_ENDTRY; } void @@ -3303,32 +3275,23 @@ TAO_EC_Timeout_Handler::handle_timeout (const ACE_Time_Value &, // ************************************************************ const char * -ACE_ES_Consumer_Name (const RtecEventChannelAdmin::ConsumerQOS &qos) +ACE_ES_Consumer_Name (const RtecEventChannelAdmin::ConsumerQOS &qos, + CORBA::Environment &_env) { // The first dependency should designate a correlation group. - TAO_TRY - { - ACE_FUNCTION_TIMEPROBE (TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_REQUESTED); - - if (qos.dependencies.length () <= 1) - return "no-name"; + ACE_FUNCTION_TIMEPROBE (TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_REQUESTED); + if (qos.dependencies.length () <= 1) + return "no-name"; - RtecScheduler::RT_Info* rt_info = ACE_Scheduler_Factory::server ()->get - (qos.dependencies[1].rt_info, TAO_TRY_ENV); - TAO_CHECK_ENV; + RtecScheduler::RT_Info* rt_info = ACE_Scheduler_Factory::server ()->get + (qos.dependencies[1].rt_info, _env); + TAO_CHECK_ENV_RETURN (_env, 0); - if (rt_info == 0) - return "no-name"; + if (rt_info == 0) + return "no-name"; - return rt_info->entry_point; - } - TAO_CATCHANY - { - return "no-name"; - } - TAO_ENDTRY; - return "no-name"; + return rt_info->entry_point; } // ************************************************************ @@ -3382,6 +3345,11 @@ template class ACE_Node; template class ACE_Unbounded_Set; template class ACE_Unbounded_Set_Iterator; +template class ACE_Auto_Basic_Ptr; +template class ACE_Auto_Basic_Ptr; +template class auto_ptr; +template class auto_ptr; + #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Atomic_Op @@ -3427,4 +3395,9 @@ template class ACE_Unbounded_Set_Iterator; #pragma instantiate ACE_Unbounded_Set #pragma instantiate ACE_Unbounded_Set_Iterator +#pragma instantiate ACE_Auto_Basic_Ptr +#pragma instantiate ACE_Auto_Basic_Ptr +#pragma instantiate auto_ptr +#pragma instantiate auto_ptr + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h index c15b82bd532..c955d57fce2 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h @@ -1430,10 +1430,11 @@ private: // ************************************************************ -// Helper function that returns -// qos.dependencies_[0].rt_info_->entry_point or "no-name". +// Helper function that returns the first RT_Info entry point name. +// Use for debugging purposes only. const TAO_ORBSVCS_Export char * -ACE_ES_Consumer_Name (const RtecEventChannelAdmin::ConsumerQOS &qos); +ACE_ES_Consumer_Name (const RtecEventChannelAdmin::ConsumerQOS &qos, + CORBA::Environment &_env); // ************************************************************ -- cgit v1.2.1