summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Sequence
diff options
context:
space:
mode:
authormichel_j <michel_j@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-07-01 20:13:11 +0000
committermichel_j <michel_j@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-07-01 20:13:11 +0000
commitc1eb8ab20e76266f9e5b0a1009bfc2e1c93f7aa0 (patch)
tree27aaea0cca31b544397fe079ece67ca0b6678ebd /TAO/orbsvcs/orbsvcs/Notify/Sequence
parent70424f8e426ecd24b30dcdea12c1caa7d37abc57 (diff)
downloadATCD-c1eb8ab20e76266f9e5b0a1009bfc2e1c93f7aa0.tar.gz
ChangeLogTag: Fri Jul 1 14:43:27 2005 Justin Michel <michel_j@ociweb.com>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Sequence')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp90
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h56
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h9
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h7
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp47
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h15
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h2
10 files changed, 38 insertions, 200 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp
deleted file mode 100644
index 87c85962bdc..00000000000
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-// $Id$
-
-#include "Batch_Buffering_Strategy.h"
-#include "../Method_Request_Event.h"
-#include "ace/Null_Condition.h"
-
-ACE_RCSID (Notify, TAO_Notify_Batch_Buffering_Strategy, "$Id$")
-
-TAO_Notify_Batch_Buffering_Strategy::TAO_Notify_Batch_Buffering_Strategy (TAO_Notify_Message_Queue& msg_queue, TAO_Notify_AdminProperties_var& admin_properties, CORBA::Long batch_size)
- :TAO_Notify_Buffering_Strategy (msg_queue, admin_properties, batch_size)
-{
-}
-
-TAO_Notify_Batch_Buffering_Strategy::~TAO_Notify_Batch_Buffering_Strategy ()
-{
-}
-
-int
-TAO_Notify_Batch_Buffering_Strategy::dequeue_batch (CosNotification::EventBatch& event_batch)
-{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
-
- // if batch_size is infinite, simply dequeue everything available.
-
- int pending = 0; // not used.
-
- if (this->batch_size_ == 0)
- {
- return this->dequeue_available (event_batch, pending);
- }
- else
- {
- // block till batch size of events are available.
- while (this->msg_queue_.message_count () < this->batch_size_)
- {
- if (this->shutdown_ == 1) // if we're shutdown, don't play this silly game.
- return -1;
-
- this->batch_size_reached_condition_.wait ();
- }
-
- return this->dequeue_i (this->batch_size_, event_batch);
- }
-}
-
-int
-TAO_Notify_Batch_Buffering_Strategy::dequeue_available (CosNotification::EventBatch& event_batch, int &pending)
-{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->global_queue_lock_, -1);
-
- int deq_count = this->msg_queue_.message_count ();
-
- if (this->batch_size_ != 0 && deq_count > this->batch_size_) // Restrict upto batch size.
- deq_count = this->batch_size_;
-
- pending = this->msg_queue_.message_count () - deq_count;
-
- return this->dequeue_i (deq_count, event_batch);
-}
-
-int
-TAO_Notify_Batch_Buffering_Strategy::dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch)
-{
- ACE_Message_Block *mb;
-
- int deq_count = 0;
-
- event_batch.length (max_deq_count);
-
- for (; deq_count < max_deq_count; ++deq_count)
- {
- if (this->msg_queue_.dequeue (mb) == -1)
- break; // error, simply return what we could extract so far.
-
- --this->global_queue_length_;
-
- TAO_Notify_Method_Request_Event_Queueable* mre = dynamic_cast<TAO_Notify_Method_Request_Event_Queueable*> (mb);
-
- mre->event ()->convert (event_batch[deq_count]);
-
- ACE_Message_Block::release (mb);
- }
-
- event_batch.length (deq_count);
-
- this->global_queue_not_full_condition_.signal ();
- this->local_queue_not_full_condition_.signal ();
-
- return deq_count;
-}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
deleted file mode 100644
index 069ad98a710..00000000000
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/Batch_Buffering_Strategy.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/* -*- C++ -*- */
-/**
- * @file Batch_Buffering_Strategy.h
- *
- * $Id$
- *
- * @author Pradeep Gore <pradeep@oomworks.com>
- *
- *
- */
-
-#ifndef TAO_Notify_BATCH_BUFFERING_STRATEGY_H
-#define TAO_Notify_BATCH_BUFFERING_STRATEGY_H
-#include /**/ "ace/pre.h"
-
-#include "../notify_serv_export.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
-
-#include "../Method_Request.h"
-#include "../Buffering_Strategy.h"
-
-/**
- * @class TAO_Notify_Batch_Buffering_Strategy
- *
- * @brief
- *
- */
-class TAO_Notify_Serv_Export TAO_Notify_Batch_Buffering_Strategy : public TAO_Notify_Buffering_Strategy
-{
-public:
- /// Constuctor
- TAO_Notify_Batch_Buffering_Strategy (TAO_Notify_Message_Queue& msg_queue, TAO_Notify_AdminProperties_var& admin_properties, CORBA::Long batch_size);
-
- /// Destructor
- ~TAO_Notify_Batch_Buffering_Strategy ();
-
- /// Dequeue batch. This method will block till <batch_size> is available..
- /// Return -1 on error else the number of items actually dequeued.
- int dequeue_batch (CosNotification::EventBatch& event_batch);
-
- /// Dequeue upto batch. This method will not block.
- /// Return -1 on error else the number of items dequeued (<batch_size>).
- /// <pending> is set to the number of events remaining in the queue.
- int dequeue_available (CosNotification::EventBatch& event_batch, int &pending);
-
-protected:
-
- /// Extract upto <max_deq_count> number of items.
- int dequeue_i (int max_deq_count, CosNotification::EventBatch& event_batch);
-};
-
-#include /**/ "ace/post.h"
-#endif /* TAO_Notify_BATCH_BUFFERING_STRATEGY_H */
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp
index 11b50b7fd22..15bd1db1eca 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.cpp
@@ -22,8 +22,6 @@ TAO_Notify_SequenceProxyPushConsumer::~TAO_Notify_SequenceProxyPushConsumer ()
void
TAO_Notify_SequenceProxyPushConsumer::release (void)
{
- if (this->supplier_)
- this->supplier_->release ();
delete this;
//@@ inform factory
@@ -67,7 +65,7 @@ TAO_Notify_SequenceProxyPushConsumer::push_structured_events (const CosNotificat
))
{
// Check if we should proceed at all.
- if (this->admin_properties_->reject_new_events () == 1 && this->admin_properties_->queue_full ())
+ if (this->admin_properties().reject_new_events () == 1 && this->admin_properties().queue_full ())
ACE_THROW (CORBA::IMP_LIMIT ());
if (this->is_connected () == 0)
@@ -91,6 +89,7 @@ TAO_Notify_SequenceProxyPushConsumer::disconnect_sequence_push_consumer (ACE_ENV
CORBA::SystemException
))
{
+ TAO_Notify_SequenceProxyPushConsumer::Ptr guard( this );
this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
index c20233c05c8..5e65ea9f03b 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushConsumer.h
@@ -45,10 +45,7 @@ public:
TAO_Notify_SequenceProxyPushConsumer (void);
/// Destructor
- ~TAO_Notify_SequenceProxyPushConsumer ();
-
- /// TAO_Notify_Destroy_Callback methods
- virtual void release (void);
+ virtual ~TAO_Notify_SequenceProxyPushConsumer ();
virtual const char * get_proxy_type_name (void) const;
@@ -86,6 +83,10 @@ protected:
ACE_THROW_SPEC ((
CORBA::SystemException
));
+
+private:
+ /// TAO_Notify_Destroy_Callback methods
+ virtual void release (void);
};
#if defined(_MSC_VER)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp
index 8cf6ba6cc64..c87728df2ab 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.cpp
@@ -19,8 +19,6 @@ TAO_Notify_SequenceProxyPushSupplier::~TAO_Notify_SequenceProxyPushSupplier ()
void
TAO_Notify_SequenceProxyPushSupplier::release (void)
{
- if (this->consumer_)
- this->consumer_->release ();
delete this;
//@@ inform factory
@@ -40,7 +38,7 @@ TAO_Notify_SequenceProxyPushSupplier::connect_sequence_push_consumer (CosNotifyC
TAO_Notify_SequencePushConsumer (this),
CORBA::NO_MEMORY ());
- consumer->init (push_consumer, this->admin_properties_ ACE_ENV_ARG_PARAMETER);
+ consumer->init (push_consumer ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->connect (consumer ACE_ENV_ARG_PARAMETER);
@@ -55,6 +53,7 @@ TAO_Notify_SequenceProxyPushSupplier::disconnect_sequence_push_supplier (ACE_ENV
))
{
+ TAO_Notify_SequenceProxyPushSupplier::Ptr guard( this );
this->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->self_change (ACE_ENV_SINGLE_ARG_PARAMETER);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
index 528dbda5d8b..2d727b94e33 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequenceProxyPushSupplier.h
@@ -52,10 +52,8 @@ public:
TAO_Notify_SequenceProxyPushSupplier (void);
/// Destructor
- ~TAO_Notify_SequenceProxyPushSupplier ();
+ virtual ~TAO_Notify_SequenceProxyPushSupplier ();
- /// TAO_Notify_Destroy_Callback methods
- virtual void release (void);
virtual const char * get_proxy_type_name (void) const;
@@ -83,6 +81,9 @@ public:
ACE_THROW_SPEC ((
CORBA::SystemException
));
+
+ /// TAO_Notify_Destroy_Callback methods
+ virtual void release (void);
};
#if defined(_MSC_VER)
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
index 28e43c87dba..689272a738d 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp
@@ -21,7 +21,7 @@ ACE_RCSID (Notify, TAO_Notify_SequencePushConsumer, "$Id$")
#endif //DEBUG_LEVEL
TAO_Notify_SequencePushConsumer::TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy)
- : TAO_Notify_Consumer (proxy)
+: TAO_Notify_Consumer (proxy)
{
}
@@ -30,31 +30,11 @@ TAO_Notify_SequencePushConsumer::~TAO_Notify_SequencePushConsumer ()
}
void
-TAO_Notify_SequencePushConsumer::init (
- CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties
-#if 1
- ACE_ENV_ARG_DECL_NOT_USED)
-#else //1
- ACE_ENV_ARG_DECL)
-#endif
+TAO_Notify_SequencePushConsumer::init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL_NOT_USED)
{
- set_consumer (push_consumer);
+ ACE_ASSERT (this->push_consumer_.in() == 0);
+ ACE_ASSERT (push_consumer != 0);
-#if 1 //// @@ TODO: use buffering strategy in TAO_Notify_Consumer???
- ACE_UNUSED_ARG ( admin_properties);
-#else //1
-
- ACE_NEW_THROW_EX (this->buffering_strategy_,
- TAO_Notify_Batch_Buffering_Strategy (this->msg_queue_, admin_properties,
- this->max_batch_size_.value ()),
- CORBA::NO_MEMORY ());
-#endif // 1
-}
-
-void
-TAO_Notify_SequencePushConsumer::set_consumer (
- CosNotifyComm::SequencePushConsumer_ptr push_consumer)
-{
this->push_consumer_ = CosNotifyComm::SequencePushConsumer::_duplicate (push_consumer);
this->publish_ = CosNotifyComm::NotifyPublish::_duplicate (push_consumer);
@@ -164,11 +144,15 @@ TAO_Notify_SequencePushConsumer::dispatch_from_queue (Request_Queue& requests, A
static_cast<int> (this->proxy ()->id ()),
request->sequence ()
));
- ace_mon.acquire ();
requests.enqueue_head (request); // put the failed event back where it was
result = false;
break;
}
+ default:
+ {
+ result = false;
+ break;
+ }
}
}
return result;
@@ -184,13 +168,15 @@ TAO_Notify_SequencePushConsumer::enqueue_if_necessary (
this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (false);
- if (this->pacing_.is_valid ())
+ size_t mbs = static_cast<size_t>(this->max_batch_size_.value());
+
+ if (this->pending_events().size() >= mbs || this->pacing_.is_valid () == 0)
{
- schedule_timer (false);
+ this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
}
else
{
- this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
+ schedule_timer (false);
}
return true;
}
@@ -239,10 +225,11 @@ TAO_Notify_SequencePushConsumer::get_ior (ACE_CString & iorstr) const
void
TAO_Notify_SequencePushConsumer::reconnect_from_consumer (TAO_Notify_Consumer* old_consumer
- ACE_ENV_ARG_DECL_NOT_USED)
+ ACE_ENV_ARG_DECL)
{
TAO_Notify_SequencePushConsumer* tmp = dynamic_cast<TAO_Notify_SequencePushConsumer *> (old_consumer);
ACE_ASSERT(tmp != 0);
- this->set_consumer(tmp->push_consumer_.in());
+ this->init(tmp->push_consumer_.in() ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
this->schedule_timer(false);
}
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
index 76ff5d049ed..09cfbf71969 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.h
@@ -25,7 +25,6 @@
#include "../Property_T.h"
#include "../Consumer.h"
#include "../AdminProperties.h"
-#include "Batch_Buffering_Strategy.h"
#include "ace/Null_Condition.h"
class TAO_Notify_ProxySupplier;
@@ -42,19 +41,12 @@ class TAO_Notify_Serv_Export TAO_Notify_SequencePushConsumer
: public TAO_Notify_Consumer
{
public:
- /// Constuctor
TAO_Notify_SequencePushConsumer (TAO_Notify_ProxySupplier* proxy);
- /// Destructor
- ~TAO_Notify_SequencePushConsumer ();
+ virtual ~TAO_Notify_SequencePushConsumer ();
/// Init the Consumer
- void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer, TAO_Notify_AdminProperties_var& admin_properties ACE_ENV_ARG_DECL);
-
- void set_consumer (CosNotifyComm::SequencePushConsumer_ptr push_consumer);
-
- /// TAO_Notify_Destroy_Callback methods.
- virtual void release (void);
+ void init (CosNotifyComm::SequencePushConsumer_ptr push_consumer ACE_ENV_ARG_DECL);
/// Add request to a queue if necessary.
/// for Sequence it's always necessary.
@@ -89,6 +81,9 @@ protected:
/// The Consumer
CosNotifyComm::SequencePushConsumer_var push_consumer_;
+private:
+ /// TAO_Notify_Destroy_Callback methods.
+ virtual void release (void);
};
#include /**/ "ace/post.h"
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp
index e394b2a8f37..adaaf0b24af 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.cpp
@@ -18,6 +18,8 @@ TAO_Notify_SequencePushSupplier::~TAO_Notify_SequencePushSupplier ()
void
TAO_Notify_SequencePushSupplier::init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL_NOT_USED)
{
+ ACE_ASSERT (push_supplier != 0 && this->push_supplier_.in() == 0);
+
this->push_supplier_ = CosNotifyComm::SequencePushSupplier::_duplicate (push_supplier);
this->subscribe_ = CosNotifyComm::NotifySubscribe::_duplicate (push_supplier);
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
index bfebfd33ea2..24d778e400f 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
+++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushSupplier.h
@@ -37,7 +37,7 @@ public:
TAO_Notify_SequencePushSupplier (TAO_Notify_ProxyConsumer* proxy);
/// Destructor
- ~TAO_Notify_SequencePushSupplier ();
+ virtual ~TAO_Notify_SequencePushSupplier ();
/// Init
void init (CosNotifyComm::SequencePushSupplier_ptr push_supplier ACE_ENV_ARG_DECL);