summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-10-04 21:54:56 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-10-04 21:54:56 +0000
commit49fbf9581e5ec613d80ae222d6dd5eecede6d248 (patch)
treebfb27d074f323861a2d6134c76af65a110eceea6 /TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
parent349626591b4feafa7e2a9f8926842ae596c4e3d2 (diff)
downloadATCD-49fbf9581e5ec613d80ae222d6dd5eecede6d248.tar.gz
ChangeLogTag:Sun Oct 4 16:37:23 1998 Carlos O'Ryan <coryan@tango.cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp359
1 files changed, 166 insertions, 193 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
index ee1aee6d32a..ec8c4f21fb5 100644
--- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp
@@ -1,6 +1,7 @@
// $Id$
#include "ace/Service_Config.h"
+#include "ace/Auto_Ptr.h"
#include "orbsvcs/Scheduler_Factory.h"
#include "orbsvcs/Event_Utilities.h"
@@ -89,37 +90,22 @@ ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_Event_Channel_Timeprobe_Description,
// ************************************************************
-static RtecScheduler::OS_Priority
-Preemption_Priority (RtecScheduler::handle_t rtinfo)
+static RtecScheduler::Preemption_Priority
+Preemption_Priority (RtecScheduler::handle_t rtinfo,
+ CORBA::Environment &_env)
{
RtecScheduler::OS_Priority thread_priority;
RtecScheduler::Preemption_Subpriority subpriority;
RtecScheduler::Preemption_Priority preemption_priority;
- TAO_TRY
- {
- ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED);
- ACE_Scheduler_Factory::server ()->priority
- (rtinfo,
- thread_priority,
- subpriority,
- preemption_priority,
- TAO_TRY_ENV);
- TAO_CHECK_ENV
- ACE_TIMEPROBE (TAO_EVENT_CHANNEL_CONNECTED_PRIORITY_OBTAINED);
- }
- TAO_CATCH (RtecScheduler::UNKNOWN_TASK, ex_ut)
- {
- ACE_ERROR_RETURN ((LM_ERROR, "UNKNOWN_TASK %p.\n",
- "Preemption_Priority"), 0);
- }
- TAO_CATCHANY
- {
- ACE_ERROR_RETURN ((LM_ERROR, "Unexpected exception %p.\n",
- "Preemption_Priority"), 0);
-
- }
- TAO_ENDTRY;
+ ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PREEMPTION_PRIORITY_PRIORITY_REQUESTED);
+ ACE_Scheduler_Factory::server ()->priority
+ (rtinfo,
+ thread_priority,
+ subpriority,
+ preemption_priority,
+ _env);
+ TAO_CHECK_ENV_RETURN (_env, 0);
return preemption_priority;
}
@@ -163,17 +149,28 @@ public:
// This is so the dispatching module can query us as a dispatch
// request to get the appropriate preemption priority.
ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies);
+ CORBA::Environment env;
+ RtecScheduler::Preemption_Priority p =
+ ACE_Scheduler_MIN_PREEMPTION_PRIORITY;
while (iter.advance_dependency () == 0)
{
RtecEventComm::EventType &type = (*iter).event.header.type;
+
if (type != ACE_ES_GLOBAL_DESIGNATOR &&
type != ACE_ES_CONJUNCTION_DESIGNATOR &&
type != ACE_ES_DISJUNCTION_DESIGNATOR)
{
- if (rt_info_ == 0 ||
- ::Preemption_Priority ((*iter).rt_info) <
- ::Preemption_Priority (rt_info_))
- rt_info_ = (*iter).rt_info;
+ env.clear ();
+ RtecScheduler::Preemption_Priority q =
+ ::Preemption_Priority ((*iter).rt_info, env);
+ if (env.exception () != 0)
+ continue;
+ if (rt_info_ == 0)
+ {
+ this->rt_info_ = ((*iter).rt_info);
+ p = q;
+ continue;
+ }
}
}
}
@@ -436,22 +433,14 @@ ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events,
{
ACE_DEBUG ((LM_DEBUG,
"EC (%t) Push to disconnected consumer %s\n",
- ::ACE_ES_Consumer_Name (this->qos ())));
+ ::ACE_ES_Consumer_Name (this->qos (),
+ _env)));
// ACE_ES_DEBUG_ST (::dump_sequence (events));
return;
}
- TAO_TRY
- {
- push_consumer_->push (events, TAO_TRY_ENV);
- TAO_CHECK_ENV;
- }
- TAO_CATCH (CORBA::SystemException, se)
- {
- ACE_ERROR ((LM_ERROR, "system exception.\n"));
- TAO_RETHROW;
- }
- TAO_ENDTRY;
+ push_consumer_->push (events, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
void
@@ -942,8 +931,12 @@ ACE_ES_Subscription_Info::remove (SourceID_Map &source_subscribers,
// @@ Should probably remove the supplier from the consumers caller
// list.
- // If the set is empty, remove it from the type collection.
#if 0
+ // If the set is empty, remove it from the type collection.
+ // NOT!!!! In some cases the map is initialized to the types that a
+ // certain supplier export; removing an entry from the map renders
+ // that supplier unable to send that event type.
+ // Before changing this ask me (coryan).
if (subscribers->size () == 0)
{
Subscriber_Set *removed_subscribers;
@@ -1121,6 +1114,17 @@ ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request)
// sc->consumer ()));
CORBA::Boolean dont_update = sc->consumer ()->qos ().is_gateway;
+
+ // Deactivate the consumer proxy
+ PortableServer::POA_var poa =
+ sc->consumer ()->_default_POA (TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (sc->consumer (), TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+ poa->deactivate_object (id.in (), TAO_TRY_ENV);
+ TAO_CHECK_ENV;
+
// Delete the consumer proxy.
delete sc->consumer ();
@@ -1178,9 +1182,18 @@ ACE_ES_Consumer_Module::shutdown (void)
iter.advance ())
{
(*proxy)->shutdown ();
- // @@ Cannnot use CORBA::release (*proxy), since it is a servant.
- delete *proxy;
-
+ // @@ Cannnot use CORBA::release (*proxy), since it is a
+ // servant.
+ // Deactivate the proxy...
+ PortableServer::POA_var poa =
+ (*proxy)->_default_POA (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (*proxy, env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+ poa->deactivate_object (id.in (), env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+
// Remove the consumer from our list.
{
ACE_ES_GUARD ace_mon (lock_);
@@ -1190,6 +1203,8 @@ ACE_ES_Consumer_Module::shutdown (void)
if (all_consumers_.remove (*proxy) == -1)
ACE_ERROR ((LM_ERROR, "%p Failed to remove consumer.\n", "ACE_ES_Consumer_Module::shutdown"));
}
+
+ delete *proxy;
}
}
@@ -1283,10 +1298,11 @@ ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request,
RtecEventChannelAdmin::ProxyPushSupplier_ptr
ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &_env)
{
- ACE_Push_Consumer_Proxy *new_consumer = new ACE_Push_Consumer_Proxy (this);
+ auto_ptr<ACE_Push_Consumer_Proxy> new_consumer =
+ new ACE_Push_Consumer_Proxy (this);
// Get a new supplier proxy object.
- if (new_consumer == 0)
+ if (new_consumer.get () == 0)
{
ACE_ERROR ((LM_ERROR, "ACE_EventChannel"
"::obtain_push_supplier failed.\n"));
@@ -1294,19 +1310,16 @@ ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &_env)
}
{
- ACE_ES_GUARD ace_mon (lock_);
- if (ace_mon.locked () == 0)
- {
- delete new_consumer;
- TAO_THROW_RETURN (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR(), 0);
- }
+ TAO_GUARD_THROW_RETURN (ACE_ES_MUTEX, ace_mon, this->lock_, 0, _env,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
- if (all_consumers_.insert (new_consumer) == -1)
+ if (all_consumers_.insert (new_consumer.get ()) == -1)
ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Module insert failed.\n"));
}
- // Return the CORBA object reference to the new supplier proxy.
- return new_consumer->get_ref (_env);
+ // Return the CORBA object reference to the new supplier proxy,
+ // there is no need to hold a pointer, it is now help in the map...
+ return new_consumer.release ()->get_ref (_env);
}
void
@@ -1424,7 +1437,8 @@ ACE_ES_Correlation_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer,
CORBA::Environment &)
{
if (consumer->correlation ().disconnecting () == -1)
- ACE_ERROR ((LM_ERROR, "ACE_ES_Correlation_Module::disconnecting failed.\n"));
+ ACE_ERROR ((LM_ERROR,
+ "ACE_ES_Correlation_Module::disconnecting failed.\n"));
}
int
@@ -2555,82 +2569,71 @@ ACE_ES_Subscription_Module::subscribe_source (ACE_ES_Consumer_Rep *consumer,
// -supplier-. Add the -consumer- to the correct supplier proxy.
Supplier_Iterator iter (all_suppliers_);
- int success = -1;
-
for (ACE_Push_Supplier_Proxy **proxy = 0;
iter.next (proxy) != 0;
iter.advance ())
{
// Operator == checks if <proxy> is a proxy for <supplier>.
- if ((**proxy) == source)
- {
- ACE_ES_WGUARD mon ((*proxy)->subscription_info ().lock_);
+ if (!((**proxy) == source))
+ continue;
- ACE_ES_Subscription_Info::Subscriber_Set &set =
- (*proxy)->subscription_info ().source_subscribers_;
+ ACE_ES_WGUARD mon ((*proxy)->subscription_info ().lock_);
- // Insert the consumer to the supplier's subscription set for
- // the type.
- int insert_result = set.insert (consumer);
- switch (insert_result)
- {
- case -1:
- // Error.
- ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
- "Subscription Module::subscribe_source"), -1);
- case 0:
- default:
+ ACE_ES_Subscription_Info::Subscriber_Set &set =
+ (*proxy)->subscription_info ().source_subscribers_;
+
+ // Insert the consumer to the supplier's subscription set for
+ // the type.
+ int insert_result = set.insert (consumer);
+ switch (insert_result)
+ {
+ case -1:
+ // Error.
+ ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
+ "Subscription Module::subscribe_source"), -1);
+ case 1:
+ // Already there.
+ break;
+ case 0:
+ default:
+ {
+ // Increment the consumer rep's reference count.
+ consumer->_duplicate ();
+
+ // Add each of the supplier's dependency infos to the
+ // consumer's dependency list.
+ ACE_ES_Subscription_Info::Subscriber_Map_Iterator iter2
+ ((*proxy)->subscription_info ().type_subscribers_);
+
+ // Delete all type collections.
+ for (ACE_ES_Subscription_Info::Subscriber_Map_Entry *temp = 0;
+ iter2.next (temp) != 0;
+ iter2.advance ())
{
- // Increment the consumer rep's reference count.
- consumer->_duplicate ();
- // Success.
- // Add each of the supplier's dependency infos to the
- // consumer's dependency list.
- ACE_ES_Subscription_Info::Subscriber_Map_Iterator iter2
- ((*proxy)->subscription_info ().type_subscribers_);
-
- // Delete all type collections.
- for (ACE_ES_Subscription_Info::Subscriber_Map_Entry *temp = 0;
- iter2.next (temp) != 0;
- iter2.advance ())
+ TAO_TRY
{
- TAO_TRY
- {
- ACE_Scheduler_Factory::server()->add_dependency
- (consumer->dependency()->rt_info,
- temp->int_id_->dependency_info_->rt_info,
- temp->int_id_->dependency_info_->number_of_calls,
- RtecScheduler::ONE_WAY_CALL,
- TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- }
- TAO_CATCHANY
- {
- TAO_TRY_ENV.print_exception ("error adding dependency");
- return -1;
- }
- TAO_ENDTRY;
+ ACE_Scheduler_Factory::server()->add_dependency
+ (consumer->dependency()->rt_info,
+ temp->int_id_->dependency_info_->rt_info,
+ temp->int_id_->dependency_info_->number_of_calls,
+ RtecScheduler::ONE_WAY_CALL,
+ TAO_TRY_ENV);
+ TAO_CHECK_ENV;
}
+ TAO_CATCHANY
+ {
+ TAO_TRY_ENV.print_exception ("error adding dependency");
+ return -1;
+ }
+ TAO_ENDTRY;
}
-
- case 1:
- // Already there.
- success = 0;
- break;
- }
+ }
}
}
- // Add the consumer to the global source subscribers list.
- // @@ TODO This seems to require that source IDs be unique, i.e. any
- // new supplier with the same ID will be ignored.
- if (success == -1)
- return ACE_ES_Subscription_Info::insert_or_allocate (source_subscribers_,
- consumer,
- source);
- else
- return success;
+ return ACE_ES_Subscription_Info::insert_or_allocate (source_subscribers_,
+ consumer,
+ source);
}
// Step through all Supplier Proxies. For each proxy, if it generates
@@ -3126,24 +3129,21 @@ ACE_ES_Supplier_Module::shutdown (void)
RtecEventChannelAdmin::ProxyPushConsumer_ptr
ACE_ES_Supplier_Module::obtain_push_consumer (CORBA::Environment &_env)
{
- ACE_Push_Supplier_Proxy *new_supplier = new ACE_Push_Supplier_Proxy (this);
+ auto_ptr<ACE_Push_Supplier_Proxy> new_supplier =
+ new ACE_Push_Supplier_Proxy (this);
- if (new_supplier == 0)
+ if (new_supplier.get () == 0)
TAO_THROW_RETURN (CORBA::NO_MEMORY (CORBA::COMPLETED_NO), 0);
{
- ACE_ES_GUARD ace_mon (lock_);
- if (ace_mon.locked () == 0)
- {
- delete new_supplier;
- TAO_THROW_RETURN (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR(), 0);
- }
+ TAO_GUARD_THROW_RETURN (ACE_ES_MUTEX, ace_mon, this->lock_, 0, _env,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
- if (all_suppliers_.insert (new_supplier) == -1)
+ if (all_suppliers_.insert (new_supplier.get ()) == -1)
ACE_ERROR ((LM_ERROR, "ACE_ES_Supplier_Module insert failed.\n"));
}
- return new_supplier->get_ref (_env);
+ return new_supplier.release ()->get_ref (_env);
}
void
@@ -3152,51 +3152,23 @@ ACE_ES_Supplier_Module::push (ACE_Push_Supplier_Proxy *proxy,
CORBA::Environment &_env)
{
// ACE_DEBUG ((LM_DEBUG, "EC (%t) Supplier_Module::push\n"));
- TAO_TRY
- {
- for (CORBA::ULong i = 0; i < event.length(); ++i)
- {
- ACE_ES_Event_Container *temp =
- new ACE_ES_Event_Container (event[i]);
- //RtecEventComm::Event *temp = new RtecEventComm::Event (event);
-
- if (temp == 0)
- TAO_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO));
-
- // This will guarantee that release gets called when we exit
- // the scope.
- ACE_ES_Event_Container_var event_copy (temp);
- temp->_release ();
- ACE_TIMEPROBE (TAO_EVENT_CHANNEL_DELIVER_TO_SUPPLIER_MODULE_THRU_SUPPLIER_PROXY);
- up_->push (proxy, event_copy, TAO_TRY_ENV);
- TAO_CHECK_ENV;
- }
- }
- TAO_CATCH (RtecEventChannelAdmin::TypeError, t)
+ for (CORBA::ULong i = 0; i < event.length(); ++i)
{
- ACE_ERROR ((LM_ERROR, "%p Type Error.\n",
- "ACE_ES_Supplier_Module::push"));
- TAO_RETHROW;
- }
- TAO_CATCH (CORBA::NO_MEMORY, e)
- {
- ACE_ERROR ((LM_ERROR, "%p No Memory.\n",
- "ACE_ES_Supplier_Module::push"));
- TAO_RETHROW;
- }
- TAO_CATCH (CORBA::SystemException, e)
- {
- ACE_ERROR ((LM_ERROR, "%p CORBA System Exception.\n",
- "ACE_ES_Supplier_Module::push"));
- TAO_RETHROW;
- }
- TAO_CATCHANY
- {
- ACE_ERROR ((LM_ERROR, "ACE_ES_Supplier_Module::push: "
- "Unknown exception.\n"));
- TAO_RETHROW;
+ ACE_ES_Event_Container *temp =
+ new ACE_ES_Event_Container (event[i]);
+ //RtecEventComm::Event *temp = new RtecEventComm::Event (event);
+
+ if (temp == 0)
+ TAO_THROW (CORBA::NO_MEMORY (CORBA::COMPLETED_NO));
+
+ // This will guarantee that release gets called when we exit
+ // the scope.
+ ACE_ES_Event_Container_var event_copy (temp);
+ temp->_release ();
+ ACE_TIMEPROBE (TAO_EVENT_CHANNEL_DELIVER_TO_SUPPLIER_MODULE_THRU_SUPPLIER_PROXY);
+ up_->push (proxy, event_copy, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
}
- TAO_ENDTRY;
}
void
@@ -3303,32 +3275,23 @@ TAO_EC_Timeout_Handler::handle_timeout (const ACE_Time_Value &,
// ************************************************************
const char *
-ACE_ES_Consumer_Name (const RtecEventChannelAdmin::ConsumerQOS &qos)
+ACE_ES_Consumer_Name (const RtecEventChannelAdmin::ConsumerQOS &qos,
+ CORBA::Environment &_env)
{
// The first dependency should designate a correlation group.
- TAO_TRY
- {
- ACE_FUNCTION_TIMEPROBE (TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_REQUESTED);
-
- if (qos.dependencies.length () <= 1)
- return "no-name";
+ ACE_FUNCTION_TIMEPROBE (TAO_EVENT_CHANNEL_CONSUMER_NAME_PRIORITY_REQUESTED);
+ if (qos.dependencies.length () <= 1)
+ return "no-name";
- RtecScheduler::RT_Info* rt_info = ACE_Scheduler_Factory::server ()->get
- (qos.dependencies[1].rt_info, TAO_TRY_ENV);
- TAO_CHECK_ENV;
+ RtecScheduler::RT_Info* rt_info = ACE_Scheduler_Factory::server ()->get
+ (qos.dependencies[1].rt_info, _env);
+ TAO_CHECK_ENV_RETURN (_env, 0);
- if (rt_info == 0)
- return "no-name";
+ if (rt_info == 0)
+ return "no-name";
- return rt_info->entry_point;
- }
- TAO_CATCHANY
- {
- return "no-name";
- }
- TAO_ENDTRY;
- return "no-name";
+ return rt_info->entry_point;
}
// ************************************************************
@@ -3382,6 +3345,11 @@ template class ACE_Node<TAO_EC_Gateway*>;
template class ACE_Unbounded_Set<TAO_EC_Gateway*>;
template class ACE_Unbounded_Set_Iterator<TAO_EC_Gateway*>;
+template class ACE_Auto_Basic_Ptr<ACE_Push_Supplier_Proxy>;
+template class ACE_Auto_Basic_Ptr<ACE_Push_Consumer_Proxy>;
+template class auto_ptr<ACE_Push_Supplier_Proxy>;
+template class auto_ptr<ACE_Push_Consumer_Proxy>;
+
#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Atomic_Op<ACE_ES_MUTEX, int>
@@ -3427,4 +3395,9 @@ template class ACE_Unbounded_Set_Iterator<TAO_EC_Gateway*>;
#pragma instantiate ACE_Unbounded_Set<TAO_EC_Gateway*>
#pragma instantiate ACE_Unbounded_Set_Iterator<TAO_EC_Gateway*>
+#pragma instantiate ACE_Auto_Basic_Ptr<ACE_Push_Supplier_Proxy>
+#pragma instantiate ACE_Auto_Basic_Ptr<ACE_Push_Consumer_Proxy>
+#pragma instantiate auto_ptr<ACE_Push_Supplier_Proxy>
+#pragma instantiate auto_ptr<ACE_Push_Consumer_Proxy>
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */