diff options
author | michel_j <michel_j@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-07-01 20:13:11 +0000 |
---|---|---|
committer | michel_j <michel_j@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-07-01 20:13:11 +0000 |
commit | c1eb8ab20e76266f9e5b0a1009bfc2e1c93f7aa0 (patch) | |
tree | 27aaea0cca31b544397fe079ece67ca0b6678ebd /TAO/orbsvcs/orbsvcs/Notify/Sequence | |
parent | 70424f8e426ecd24b30dcdea12c1caa7d37abc57 (diff) | |
download | ATCD-c1eb8ab20e76266f9e5b0a1009bfc2e1c93f7aa0.tar.gz |
ChangeLogTag: Fri Jul 1 14:43:27 2005 Justin Michel <michel_j@ociweb.com>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Sequence')
10 files changed, 38 insertions, 200 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp deleted file mode 100644 index 87c85962bdc..00000000000 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp +++ /dev/null @@ -1,90 +0,0 @@ -// $Id$ - -#include "Batch_Buffering_Strategy.h" -#include "../Method_Request_Event.h" -#include "ace/Null_Condition.h" - -ACE_RCSID (Notify, TAO_Notify_Batch_Buffering_Strategy, "$Id$") - -TAO_Notify_Batch_Buffering_Strategy::TAO_Notify_Batch_Buffering_Strategy (TAO_Notify_Message_Queue& msg_queue, TAO_Notify_AdminProperties_var& admin_properties, CORBA::Long batch_size) - :TAO_Notify_Buffering_Strategy (msg_queue, admin_properties, batch_size) -{ -} - -TAO_Notify_Batch_Buffering_Strategy::~TAO_Notify_Batch_Buffering_Strategy () -{ -} - -int -TAO_Notify_Batch_Buffering_Strategy::dequeue_batch (CosNotification::EventBatch& event_batch) -{ - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); - - // if batch_size is infinite, simply dequeue everything available. - - int pending = 0; // not used. - - if (this->batch_size_ == 0) - { - return this->dequeue_available (event_batch, pending); - } - else - { - // block till batch size of events are available. - while (this->msg_queue_.message_count () < this->batch_size_) - { - if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game. - return -1; - - this->batch_size_reached_condition_.wait (); - } - - return this->dequeue_i (this->batch_size_, event_batch); - } -} - -int -TAO_Notify_Batch_Buffering_Strategy::dequeue_available (CosNotification::EventBatch& event_batch, int &pending) -{ - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1); - - int deq_count = this->msg_queue_.message_count (); - - if (this->batch_size_ != 0 && deq_count > this->batch_size_) // Restrict upto batch size. - deq_count = this->batch_size_; - - pending = this->msg_queue_.message_count () - deq_count; - - return this->dequeue_i (deq_count, event_batch); -} - -int -TAO_Notify_Batch_Buffering_Strategy::dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch) -{ - ACE_Message_Block *mb; - - int deq_count = 0; - - event_batch.length (max_deq_count); - - for (; deq_count < max_deq_count; ++deq_count) - { - if (this->msg_queue_.dequeue (mb) == -1) - break; // error, simply return what we could extract so far. - - --this->global_queue_length_; - - TAO_Notify_Method_Request_Event_Queueable* mre = dynamic_cast<TAO_Notify_Method_Request_Event_Queueable*> (mb); - - mre->event ()->convert (event_batch[deq_count]); - - ACE_Message_Block::release (mb); - } - - event_batch.length (deq_count); - - this->global_queue_not_full_condition_.signal (); - this->local_queue_not_full_condition_.signal (); - - return deq_count; -} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h deleted file mode 100644 index 069ad98a710..00000000000 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h +++ /dev/null @@ -1,56 +0,0 @@ -/* -*- C++ -*- */ -/** - * @file Batch_Buffering_Strategy.h - * - * $Id$ - * - * @author Pradeep Gore <pradeep@oomworks.com> - * - * - */ - -#ifndef TAO_Notify_BATCH_BUFFERING_STRATEGY_H -#define TAO_Notify_BATCH_BUFFERING_STRATEGY_H -#include /**/ "ace/pre.h" - -#include "../notify_serv_export.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ - -#include "../Method_Request.h" -#include "../Buffering_Strategy.h" - -/** - * @class TAO_Notify_Batch_Buffering_Strategy - * - * @brief - * - */ -class TAO_Notify_Serv_Export TAO_Notify_Batch_Buffering_Strategy : public TAO_Notify_Buffering_Strategy -{ -public: - /// Constuctor - TAO_Notify_Batch_Buffering_Strategy (TAO_Notify_Message_Queue& msg_queue, TAO_Notify_AdminProperties_var& admin_properties, CORBA::Long batch_size); - - /// Destructor - ~TAO_Notify_Batch_Buffering_Strategy (); - - /// Dequeue batch. This method will block till <batch_size> is available.. - /// Return -1 on error else the number of items actually dequeued. - int dequeue_batch (CosNotification::EventBatch& event_batch); - - /// Dequeue upto batch. This method will not block. - /// Return -1 on error else the number of items dequeued (<batch_size>). - /// <pending> is set to the number of events remaining in the queue. - int dequeue_available (CosNotification::EventBatch& event_batch, int &pending); - -protected: - - /// Extract upto <max_deq_count> number of items. - int dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch); -}; - -#include /**/ "ace/post.h" -#endif /* TAO_Notify_BATCH_BUFFERING_STRATEGY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp index 11b50b7fd22..15bd1db1eca 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp @@ -22,8 +22,6 @@ TAO_Notify_SequenceProxyPushConsumer::~TAO_Notify_SequenceProxyPushConsumer () void TAO_Notify_SequenceProxyPushConsumer::release (void) { - if (this->supplier_) - this->supplier_->release (); delete this; //@@ inform factory @@ -67,7 +65,7 @@ TAO_Notify_SequenceProxyPushConsumer::push_structured_events (const CosNotificat )) { // Check if we should proceed at all. - if (this->admin_properties_->reject_new_events () == 1 && this->admin_properties_->queue_full ()) + if (this->admin_properties().reject_new_events () == 1 && this->admin_properties().queue_full ()) ACE_THROW (CORBA::IMP_LIMIT ()); if (this->is_connected () == 0) @@ -91,6 +89,7 @@ TAO_Notify_SequenceProxyPushConsumer::disconnect_sequence_push_consumer (ACE_ENV CORBA::SystemException )) { + TAO_Notify_SequenceProxyPushConsumer::Ptr guard( this ); this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h index c20233c05c8..5e65ea9f03b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h @@ -45,10 +45,7 @@ public: TAO_Notify_SequenceProxyPushConsumer (void); /// Destructor - ~TAO_Notify_SequenceProxyPushConsumer (); - - /// TAO_Notify_Destroy_Callback methods - virtual void release (void); + virtual ~TAO_Notify_SequenceProxyPushConsumer (); virtual const char * get_proxy_type_name (void) const; @@ -86,6 +83,10 @@ protected: ACE_THROW_SPEC (( CORBA::SystemException )); + +private: + /// TAO_Notify_Destroy_Callback methods + virtual void release (void); }; #if defined(_MSC_VER) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp index 8cf6ba6cc64..c87728df2ab 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp @@ -19,8 +19,6 @@ TAO_Notify_SequenceProxyPushSupplier::~TAO_Notify_SequenceProxyPushSupplier () void TAO_Notify_SequenceProxyPushSupplier::release (void) { - if (this->consumer_) - this->consumer_->release (); delete this; //@@ inform factory @@ -40,7 +38,7 @@ TAO_Notify_SequenceProxyPushSupplier::connect_sequence_push_consumer (CosNotifyC TAO_Notify_SequencePushConsumer (this), CORBA::NO_MEMORY ()); - consumer->init (push_consumer, this->admin_properties_ ACE_ENV_ARG_PARAMETER); + consumer->init (push_consumer ACE_ENV_ARG_PARAMETER); ACE_CHECK; this->connect (consumer ACE_ENV_ARG_PARAMETER); @@ -55,6 +53,7 @@ TAO_Notify_SequenceProxyPushSupplier::disconnect_sequence_push_supplier (ACE_ENV )) { + TAO_Notify_SequenceProxyPushSupplier::Ptr guard( this ); this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h index 528dbda5d8b..2d727b94e33 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h @@ -52,10 +52,8 @@ public: TAO_Notify_SequenceProxyPushSupplier (void); /// Destructor - ~TAO_Notify_SequenceProxyPushSupplier (); + virtual ~TAO_Notify_SequenceProxyPushSupplier (); - /// TAO_Notify_Destroy_Callback methods - virtual void release (void); virtual const char * get_proxy_type_name (void) const; @@ -83,6 +81,9 @@ public: ACE_THROW_SPEC (( CORBA::SystemException )); + + /// TAO_Notify_Destroy_Callback methods + virtual void release (void); }; #if defined(_MSC_VER) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index 28e43c87dba..689272a738d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -21,7 +21,7 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$") #endif //DEBUG_LEVEL TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy) - : TAO_Notify_Consumer (proxy) +: TAO_Notify_Consumer (proxy) { } @@ -30,31 +30,11 @@ TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer () } void -TAO_Notify_SequencePushConsumer::init ( - CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties -#if 1 - ACE_ENV_ARG_DECL_NOT_USED) -#else //1 - ACE_ENV_ARG_DECL) -#endif +TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL_NOT_USED) { - set_consumer (push_consumer); + ACE_ASSERT (this->push_consumer_.in() == 0); + ACE_ASSERT (push_consumer != 0); -#if 1 //// @@ TODO: use buffering strategy in TAO_Notify_Consumer??? - ACE_UNUSED_ARG ( admin_properties); -#else //1 - - ACE_NEW_THROW_EX (this->buffering_strategy_, - TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties, - this->max_batch_size_.value ()), - CORBA::NO_MEMORY ()); -#endif // 1 -} - -void -TAO_Notify_SequencePushConsumer::set_consumer ( - CosNotifyComm::SequencePushConsumer_ptr push_consumer) -{ this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer); this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer); @@ -164,11 +144,15 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A static_cast<int> (this->proxy ()->id ()), request->sequence () )); - ace_mon.acquire (); requests.enqueue_head (request); // put the failed event back where it was result = false; break; } + default: + { + result = false; + break; + } } } return result; @@ -184,13 +168,15 @@ TAO_Notify_SequencePushConsumer::enqueue_if_necessary ( this->enqueue_request (request ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (false); - if (this->pacing_.is_valid ()) + size_t mbs = static_cast<size_t>(this->max_batch_size_.value()); + + if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0) { - schedule_timer (false); + this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); } else { - this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER); + schedule_timer (false); } return true; } @@ -239,10 +225,11 @@ TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const void TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer - ACE_ENV_ARG_DECL_NOT_USED) + ACE_ENV_ARG_DECL) { TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer); ACE_ASSERT(tmp != 0); - this->set_consumer(tmp->push_consumer_.in()); + this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; this->schedule_timer(false); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h index 76ff5d049ed..09cfbf71969 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h @@ -25,7 +25,6 @@ #include "../Property_T.h" #include "../Consumer.h" #include "../AdminProperties.h" -#include "Batch_Buffering_Strategy.h" #include "ace/Null_Condition.h" class TAO_Notify_ProxySupplier; @@ -42,19 +41,12 @@ class TAO_Notify_Serv_Export TAO_Notify_SequencePushConsumer : public TAO_Notify_Consumer { public: - /// Constuctor TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy); - /// Destructor - ~TAO_Notify_SequencePushConsumer (); + virtual ~TAO_Notify_SequencePushConsumer (); /// Init the Consumer - void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL); - - void set_consumer (CosNotifyComm::SequencePushConsumer_ptr push_consumer); - - /// TAO_Notify_Destroy_Callback methods. - virtual void release (void); + void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL); /// Add request to a queue if necessary. /// for Sequence it's always necessary. @@ -89,6 +81,9 @@ protected: /// The Consumer CosNotifyComm::SequencePushConsumer_var push_consumer_; +private: + /// TAO_Notify_Destroy_Callback methods. + virtual void release (void); }; #include /**/ "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp index e394b2a8f37..adaaf0b24af 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp @@ -18,6 +18,8 @@ TAO_Notify_SequencePushSupplier::~TAO_Notify_SequencePushSupplier () void TAO_Notify_SequencePushSupplier::init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL_NOT_USED) { + ACE_ASSERT (push_supplier != 0 && this->push_supplier_.in() == 0); + this->push_supplier_ = CosNotifyComm::SequencePushSupplier::_duplicate (push_supplier); this->subscribe_ = CosNotifyComm::NotifySubscribe::_duplicate (push_supplier); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h index bfebfd33ea2..24d778e400f 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h @@ -37,7 +37,7 @@ public: TAO_Notify_SequencePushSupplier (TAO_Notify_ProxyConsumer* proxy); /// Destructor - ~TAO_Notify_SequencePushSupplier (); + virtual ~TAO_Notify_SequencePushSupplier (); /// Init void init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL); |