diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-17 20:20:02 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-17 20:20:02 +0000 |
commit | 836bcfdaf2929bd5a1cd8efc193b04b2541c9af3 (patch) | |
tree | c0ada509b3a66ac0fbef017167e6cd4479e14a89 /TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp | |
parent | 4c88be0e21c16ddf7d7e5dc3726b037c4ac7c810 (diff) | |
download | ATCD-836bcfdaf2929bd5a1cd8efc193b04b2541c9af3.tar.gz |
ChangeLogTag:Mon May 17 15:17:50 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp | 240 |
1 files changed, 121 insertions, 119 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp index 46577b56367..2f8f9662fea 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp @@ -12,10 +12,6 @@ #include "orbsvcs/Event/Event_Manip.h" #include "orbsvcs/Event/Event_Channel.h" -// These are to save space. -#define WRITE_GUARD ACE_ES_WRITE_GUARD -#define READ_GUARD ACE_ES_READ_GUARD - #if !defined (__ACE_INLINE__) #include "Event_Channel.i" #endif /* __ACE_INLINE__ */ @@ -298,10 +294,10 @@ ACE_Push_Supplier_Proxy::ACE_Push_Supplier_Proxy (ACE_ES_Supplier_Module *sm) void ACE_Push_Supplier_Proxy::connect_push_supplier (RtecEventComm::PushSupplier_ptr push_supplier, const RtecEventChannelAdmin::SupplierQOS &qos, - CORBA::Environment &TAO_IN_ENV) + CORBA::Environment &ACE_TRY_ENV) { if (this->connected ()) - TAO_THROW (RtecEventChannelAdmin::AlreadyConnected()); + ACE_THROW (RtecEventChannelAdmin::AlreadyConnected()); this->push_supplier_ = RtecEventComm::PushSupplier::_duplicate(push_supplier); @@ -417,12 +413,13 @@ ACE_Push_Consumer_Proxy::push (const RtecEventComm::EventSet &events, } void -ACE_Push_Consumer_Proxy::connect_push_consumer (RtecEventComm::PushConsumer_ptr push_consumer, - const RtecEventChannelAdmin::ConsumerQOS &qos, - CORBA::Environment &TAO_IN_ENV) +ACE_Push_Consumer_Proxy::connect_push_consumer ( + RtecEventComm::PushConsumer_ptr push_consumer, + const RtecEventChannelAdmin::ConsumerQOS &qos, + CORBA::Environment &ACE_TRY_ENV) { if (this->connected ()) - TAO_THROW (RtecEventChannelAdmin::AlreadyConnected()); + ACE_THROW (RtecEventChannelAdmin::AlreadyConnected()); this->push_consumer_ = RtecEventComm::PushConsumer::_duplicate(push_consumer); @@ -437,15 +434,16 @@ ACE_Push_Consumer_Proxy::connect_push_consumer (RtecEventComm::PushConsumer_ptr // ACE_ConsumerQOS_Factory::debug (qos_); - this->consumer_module_->connected (this, TAO_IN_ENV); + this->consumer_module_->connected (this, ACE_TRY_ENV); } void -ACE_Push_Consumer_Proxy::disconnect_push_supplier (CORBA::Environment &TAO_IN_ENV) +ACE_Push_Consumer_Proxy::disconnect_push_supplier ( + CORBA::Environment &TAO_IN_ENV) { ACE_TIMEPROBE_PRINT; this->push_consumer_ = RtecEventComm::PushConsumer::_nil (); - this->consumer_module_->disconnecting (this, TAO_IN_ENV); + this->consumer_module_->disconnecting (this, ACE_TRY_ENV); } void @@ -603,12 +601,12 @@ ACE_EventChannel::destroy (CORBA::Environment &) // Flush all messages in the channel. Shutdown_Channel *sc = new Shutdown_Channel (this); if (sc == 0) - TAO_THROW (CORBA::NO_MEMORY ()); + ACE_THROW (CORBA::NO_MEMORY ()); // Create a wrapper around the dispatch request. Flush_Queue_ACT *act = new Flush_Queue_ACT (sc, dispatching_module_); if (act == 0) - TAO_THROW (CORBA::NO_MEMORY ()); + ACE_THROW (CORBA::NO_MEMORY ()); // Set a 100ns timer. if (this->timer_module ()->schedule_timer (0, // no rt-info @@ -643,9 +641,7 @@ ACE_EventChannel::shutdown (void) void ACE_EventChannel::report_connect (u_long event) { - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - ACE_ERROR ((LM_ERROR, "ACE_EventChannel::report_connect")); + ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); this->report_connect_i (event); } @@ -660,9 +656,7 @@ void ACE_EventChannel::report_disconnect (u_long event) { // No need to gtrab the lock is already take by our callers. - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - ACE_ERROR ((LM_ERROR, "ACE_EventChannel::report_disconnect")); + ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); this->report_disconnect (event); } @@ -697,12 +691,14 @@ ACE_EventChannel::del_gateway (TAO_EC_Gateway* gw, } void -ACE_EventChannel::update_consumer_gwys (CORBA::Environment& TAO_IN_ENV) +ACE_EventChannel::update_consumer_gwys (CORBA::Environment& ACE_TRY_ENV) { Observer_Map observers; { - TAO_GUARD_THROW (ACE_ES_MUTEX, ace_mon, this->lock_, TAO_IN_ENV, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX ( + ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_CHECK; if (this->observers_.current_size () == 0 || this->state_ == ACE_EventChannel::SHUTDOWN) @@ -1124,7 +1120,7 @@ ACE_ES_Consumer_Module::connected (ACE_Push_Consumer_Proxy *consumer, void ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) { - TAO_TRY + ACE_TRY_NEW_ENV { Shutdown_Consumer *sc = (Shutdown_Consumer *) request; @@ -1141,25 +1137,26 @@ ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) // Deactivate the consumer proxy PortableServer::POA_var poa = - sc->consumer ()->_default_POA (TAO_TRY_ENV); - TAO_CHECK_ENV; + sc->consumer ()->_default_POA (ACE_TRY_ENV); + ACE_TRY_CHECK; PortableServer::ObjectId_var id = - poa->servant_to_id (sc->consumer (), TAO_TRY_ENV); - TAO_CHECK_ENV; - poa->deactivate_object (id.in (), TAO_TRY_ENV); - TAO_CHECK_ENV; + poa->servant_to_id (sc->consumer (), ACE_TRY_ENV); + ACE_TRY_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_TRY_CHECK; // Delete the consumer proxy, no need to delete it, is is owned // by the POA // delete sc->consumer (); if (!dont_update) - this->channel_->update_consumer_gwys (TAO_TRY_ENV); - TAO_CHECK_ENV; + this->channel_->update_consumer_gwys (ACE_TRY_ENV); + ACE_TRY_CHECK; - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - return; + ACE_GUARD_THROW_EX ( + ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_TRY_CHECK; // Tell the channel that we may need to shut down. if (all_consumers_.size () <= 0) @@ -1169,11 +1166,12 @@ ACE_ES_Consumer_Module::shutdown_request (ACE_ES_Dispatch_Request *request) channel_->report_disconnect_i (ACE_EventChannel::CONSUMER); } } - TAO_CATCHANY + ACE_CATCHANY { - TAO_TRY_ENV.print_exception ("Consumer_Module::shutdown_request"); + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Consumer_Module::shutdown_request"); } - TAO_ENDTRY; + ACE_ENDTRY; } void @@ -1182,7 +1180,7 @@ ACE_ES_Consumer_Module::shutdown (void) Consumers copy; { - ACE_ES_GUARD ace_mon (lock_); + ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_); if (ace_mon.locked () == 0) goto DONE; @@ -1221,7 +1219,7 @@ ACE_ES_Consumer_Module::shutdown (void) // Remove the consumer from our list. { - ACE_ES_GUARD ace_mon (lock_); + ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_); if (ace_mon.locked () == 0) ACE_ERROR ((LM_ERROR, "%p Failed to acquire lock.\n", "ACE_ES_Consumer_Module::shutdown")); @@ -1240,15 +1238,16 @@ DONE: void ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, - CORBA::Environment &TAO_IN_ENV) + CORBA::Environment &ACE_TRY_ENV) { { - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - TAO_THROW (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX ( + ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_CHECK; if (all_consumers_.remove (consumer) == -1) - TAO_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); + ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); } // Tell everyone else that the consumer is disconnecting. This @@ -1256,7 +1255,7 @@ ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, // etc. However, messages may still be queued in the ReactorEx or // in the Dispatching Module for this consumer, so no queues or // proxies can be deleted just yet. - down_->disconnecting (consumer, TAO_IN_ENV); + down_->disconnecting (consumer, ACE_TRY_ENV); // Send a shutdown message through the system. When this is // dispatched, the consumer proxy will be deleted. <request> is @@ -1271,13 +1270,13 @@ ACE_ES_Consumer_Module::disconnecting (ACE_Push_Consumer_Proxy *consumer, Shutdown_Consumer *sc = new Shutdown_Consumer (this, consumer, scheduler.in ()); if (sc == 0) - TAO_THROW (CORBA::NO_MEMORY ()); + ACE_THROW (CORBA::NO_MEMORY ()); // Create a wrapper around the dispatch request. Flush_Queue_ACT *act = new Flush_Queue_ACT (sc, channel_->dispatching_module_); if (act == 0) - TAO_THROW (CORBA::NO_MEMORY ()); + ACE_THROW (CORBA::NO_MEMORY ()); // ACE_DEBUG ((LM_DEBUG, "EC (%t) initiating consumer disconnect.\n")); @@ -1336,12 +1335,13 @@ ACE_ES_Consumer_Module::obtain_push_supplier (CORBA::Environment &ACE_TRY_ENV) { ACE_ERROR ((LM_ERROR, "ACE_EventChannel" "::obtain_push_supplier failed.\n")); - TAO_THROW_RETURN (CORBA::NO_MEMORY (), proxy); + ACE_THROW_RETURN (CORBA::NO_MEMORY (), proxy); } { - ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX ( + ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); ACE_CHECK_RETURN (proxy); if (all_consumers_.insert (new_consumer.get ()) == -1) @@ -2093,57 +2093,57 @@ ACE_ES_Dispatch_Request * ACE_ES_Consumer_Correlation::correlate (ACE_ES_Consumer_Rep *cr, const TAO_EC_Event &event) { - // If the consumer has specified correlation criteria, then we must - // first acquire the mutex. - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::push"), 0); - - int bit = ACE_INT2BIT[cr->type_id ()]; - if (ACE_BIT_DISABLED (this->pending_flags_, bit)) - { - // Add the new event to the pending events. - pending_events_[cr->type_id ()] += event; - // Set the bit corresponding to the arrived event. - // This should be pending_flags_->event_arrived (index); - ACE_SET_BITS (pending_flags_, bit); - } - - ACE_ES_Dispatch_Request *request = 0; - TAO_EC_Event_Array *outbox = 0; - // Since add_events changes pending_flags_, we need to keep this - // for all iterations through the conjunction groups. - u_long freeze_pending_flags = pending_flags_; - - for (int x=0; x < n_conjunction_groups_; x++) - { - if (conjunction_groups_[x].should_forward (freeze_pending_flags)) - { - // If there is a deadline timer for this conjunction group, - // this will reschedule them. - conjunction_groups_[x].reschedule_deadline (); - - // First time in, allocate the new dispatch request. - if (request == 0) - { - request = - new ACE_ES_Dispatch_Request (consumer_, - cr->dependency ()->rt_info); - if (request == 0) - ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", - "ACE_ES_Consumer_Correlation::correlate"), 0); - outbox = &(request->event_set ()); - } - - // Add each of the pending events for this correlation to - // the outgoing dispatch request. If outbox == 0, then - // this will just clear any pending events. - conjunction_groups_[x].add_events (outbox, - pending_events_, - pending_flags_); - } - } + // If the consumer has specified correlation criteria, then we must + // first acquire the mutex. + ACE_Guard<ACE_ES_MUTEX> ace_mon (lock_); + if (ace_mon.locked () == 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::push"), 0); + + int bit = ACE_INT2BIT[cr->type_id ()]; + if (ACE_BIT_DISABLED (this->pending_flags_, bit)) + { + // Add the new event to the pending events. + pending_events_[cr->type_id ()] += event; + // Set the bit corresponding to the arrived event. + // This should be pending_flags_->event_arrived (index); + ACE_SET_BITS (pending_flags_, bit); + } + + ACE_ES_Dispatch_Request *request = 0; + TAO_EC_Event_Array *outbox = 0; + // Since add_events changes pending_flags_, we need to keep this + // for all iterations through the conjunction groups. + u_long freeze_pending_flags = pending_flags_; + + for (int x=0; x < n_conjunction_groups_; x++) + { + if (conjunction_groups_[x].should_forward (freeze_pending_flags)) + { + // If there is a deadline timer for this conjunction group, + // this will reschedule them. + conjunction_groups_[x].reschedule_deadline (); + + // First time in, allocate the new dispatch request. + if (request == 0) + { + request = + new ACE_ES_Dispatch_Request (consumer_, + cr->dependency ()->rt_info); + if (request == 0) + ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", + "ACE_ES_Consumer_Correlation::correlate"), 0); + outbox = &(request->event_set ()); + } + + // Add each of the pending events for this correlation to + // the outgoing dispatch request. If outbox == 0, then + // this will just clear any pending events. + conjunction_groups_[x].add_events (outbox, + pending_events_, + pending_flags_); + } + } return request; } @@ -2373,14 +2373,15 @@ ACE_ES_Subscription_Module::reregister_consumers (RtecEventComm::EventSourceID s void ACE_ES_Subscription_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, - CORBA::Environment &TAO_IN_ENV) + CORBA::Environment &ACE_TRY_ENV) { - ACE_ES_WGUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - TAO_THROW (RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_WRITE_GUARD_THROW_EX ( + ACE_ES_RW_LOCK, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_CHECK; if (all_suppliers_.remove (supplier) == -1) - TAO_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); + ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); // Remove all consumers from the supplier's source-based subscription lists. ACE_ES_Subscription_Info::Subscriber_Set_Iterator source_iterator @@ -3141,17 +3142,19 @@ ACE_ES_Supplier_Module::connected (ACE_Push_Supplier_Proxy *supplier, void ACE_ES_Supplier_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, - CORBA::Environment &TAO_IN_ENV) + CORBA::Environment &ACE_TRY_ENV) { CORBA::Boolean need_update = 0; { - TAO_GUARD_THROW (ACE_SYNCH_MUTEX, ace_mon, this->lock_, TAO_IN_ENV, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX ( + ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_CHECK; if (all_suppliers_.remove (supplier) == -1) - TAO_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); + ACE_THROW (RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR()); - up_->disconnecting (supplier, TAO_IN_ENV); + up_->disconnecting (supplier, ACE_TRY_ENV); if (this->all_suppliers_.size () <= 0) { @@ -3168,7 +3171,7 @@ ACE_ES_Supplier_Module::disconnecting (ACE_Push_Supplier_Proxy *supplier, // CORBA::release (supplier); } if (need_update) - this->channel_->update_supplier_gwys (TAO_IN_ENV); + this->channel_->update_supplier_gwys (ACE_TRY_ENV); } void @@ -3177,9 +3180,7 @@ ACE_ES_Supplier_Module::shutdown (void) Suppliers copy; { - ACE_ES_GUARD ace_mon (lock_); - if (ace_mon.locked () == 0) - return; + ACE_GUARD (ACE_ES_MUTEX, ace_mon, this->lock_); copy = all_suppliers_; } @@ -3214,8 +3215,9 @@ ACE_ES_Supplier_Module::obtain_push_consumer (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_RETURN (CORBA::NO_MEMORY (), proxy); { - ACE_GUARD_THROW_EX (ACE_ES_MUTEX, ace_mon, this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_GUARD_THROW_EX ( + ACE_ES_MUTEX, ace_mon, this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); ACE_CHECK_RETURN (proxy); if (all_suppliers_.insert (new_supplier.get ()) == -1) |