summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Huston <shuston@riverace.com>2006-02-04 12:22:28 +0000
committerSteve Huston <shuston@riverace.com>2006-02-04 12:22:28 +0000
commitfdcff80a3b72f1a23eac7f3bbdfac45e2d9f0d0a (patch)
treecac5a5c70b0014e7ae2de8476ad8dda0e9a2132a
parent925d467ec553414c7a8cc19741681408898ca1b4 (diff)
downloadATCD-fdcff80a3b72f1a23eac7f3bbdfac45e2d9f0d0a.tar.gz
ChangeLogTag:Fri Feb 3 23:48:32 UTC 2006 Steve Huston <shuston@riverace.com>
-rw-r--r--ChangeLog17
-rw-r--r--ace/Barrier.h2
-rw-r--r--ace/Message_Queue_T.cpp74
-rw-r--r--ace/Message_Queue_T.h324
-rw-r--r--tests/Message_Queue_Test.cpp95
5 files changed, 372 insertions, 140 deletions
diff --git a/ChangeLog b/ChangeLog
index fe0763eb68c..7e2b5a30520 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,20 @@
+Fri Feb 3 23:48:32 UTC 2006 Steve Huston <shuston@riverace.com>
+
+ * ace/Barrier.h: Noted shutdown() was added for 5.4.9.
+
+ * ace/Message_Queue_T.{h cpp}: Changed enqueue_head(), enqueue_tail()
+ to recognize that the ACE_Message_Block passed may have other
+ block(s) connected to it via the next() pointers. This allows a
+ caller to pre-connect a series of ACE_Message_Blocks and coalesce
+ the enqueueing of the series into a single method call.
+ Thanks to Guy Peleg <guy dot peleg at amdocs dot com> for suggesting
+ this enhancement.
+ Also revamped the Doxygenization of ACE_Message_Queue's
+ documentation.
+
+ * tests/Message_Queue_Test.cpp: Added chained_block_test() to test
+ the new functionality above.
+
Fri Feb 3 14:47:53 UTC 2006 Ossama Othman <ossama_othman at symantec dot com>
* ace/Cleanup_Strategies_T.h:
diff --git a/ace/Barrier.h b/ace/Barrier.h
index 84d97e836b9..7939aed1d24 100644
--- a/ace/Barrier.h
+++ b/ace/Barrier.h
@@ -116,6 +116,8 @@ public:
/// value -1, errno ESHUTDOWN.
///
/// @retval 0 for success, -1 if already shut down.
+ ///
+ /// @since ACE beta 5.4.9.
int shutdown (void);
/// Dump the state of an object.
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
index 595660a11a5..0ece6377375 100644
--- a/ace/Message_Queue_T.cpp
+++ b/ace/Message_Queue_T.cpp
@@ -977,35 +977,48 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
if (new_item == 0)
return -1;
+ // Update the queued size and length, taking into account any chained
+ // blocks (total_size_and_length() counts all continuation blocks).
+ // Keep count of how many blocks we're adding and, if there is a chain of
+ // blocks, find the end in seq_tail and be sure they're properly
+ // back-connected along the way.
+ ACE_Message_Block *seq_tail = new_item;
+ ++this->cur_count_;
+ new_item->total_size_and_length (this->cur_bytes_,
+ this->cur_length_);
+ while (seq_tail->next () != 0)
+ {
+ seq_tail->next ()->prev (seq_tail);
+ seq_tail = seq_tail->next ();
+ ++this->cur_count_;
+ seq_tail->total_size_and_length (this->cur_bytes_,
+ this->cur_length_);
+ }
+
// List was empty, so build a new one.
if (this->tail_ == 0)
{
this->head_ = new_item;
- this->tail_ = new_item;
- new_item->next (0);
+ this->tail_ = seq_tail;
+ // seq_tail->next (0); This is a condition of the while() loop above.
new_item->prev (0);
}
// Link at the end.
else
{
- new_item->next (0);
+ // seq_tail->next (0); This is a condition of the while() loop above.
this->tail_->next (new_item);
new_item->prev (this->tail_);
- this->tail_ = new_item;
+ this->tail_ = seq_tail;
}
- // Make sure to count all the bytes in a composite message!!!
- new_item->total_size_and_length (this->cur_bytes_,
- this->cur_length_);
- ++this->cur_count_;
-
if (this->signal_dequeue_waiters () == -1)
return -1;
else
return this->cur_count_;
}
-// Actually put the node at the head (no locking)
+// Actually put the node(s) at the head (no locking)
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
@@ -1015,21 +1028,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
if (new_item == 0)
return -1;
+ // Update the queued size and length, taking into account any chained
+ // blocks (total_size_and_length() counts all continuation blocks).
+ // Keep count of how many blocks we're adding and, if there is a chain of
+ // blocks, find the end in seq_tail and be sure they're properly
+ // back-connected along the way.
+ ACE_Message_Block *seq_tail = new_item;
+ ++this->cur_count_;
+ new_item->total_size_and_length (this->cur_bytes_,
+ this->cur_length_);
+ while (seq_tail->next () != 0)
+ {
+ seq_tail->next ()->prev (seq_tail);
+ seq_tail = seq_tail->next ();
+ ++this->cur_count_;
+ seq_tail->total_size_and_length (this->cur_bytes_,
+ this->cur_length_);
+ }
+
new_item->prev (0);
- new_item->next (this->head_);
+ seq_tail->next (this->head_);
if (this->head_ != 0)
- this->head_->prev (new_item);
+ this->head_->prev (seq_tail);
else
- this->tail_ = new_item;
+ this->tail_ = seq_tail;
this->head_ = new_item;
- // Make sure to count all the bytes in a composite message!!!
- new_item->total_size_and_length (this->cur_bytes_,
- this->cur_length_);
- ++this->cur_count_;
-
if (this->signal_dequeue_waiters () == -1)
return -1;
else
@@ -1047,6 +1073,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
if (new_item == 0)
return -1;
+ // Since this method uses enqueue_head_i() and enqueue_tail_i() for
+ // special situations, and this method doesn't support enqueueing
+ // chains of blocks off the 'next' pointer, make sure the new_item's
+ // next pointer is 0.
+ new_item->next (0);
+
if (this->head_ == 0)
// Check for simple case of an empty queue, where all we need to
// do is insert <new_item> into the head.
@@ -1113,6 +1145,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_ite
if (new_item == 0)
return -1;
+ // Since this method uses enqueue_head_i() and enqueue_tail_i() for
+ // special situations, and this method doesn't support enqueueing
+ // chains of blocks off the 'next' pointer, make sure the new_item's
+ // next pointer is 0.
+ new_item->next (0);
+
if (this->head_ == 0)
// Check for simple case of an empty queue, where all we need to
// do is insert <new_item> into the head.
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
index 214770b60f6..c77ecaffceb 100644
--- a/ace/Message_Queue_T.h
+++ b/ace/Message_Queue_T.h
@@ -35,14 +35,19 @@ class ACE_Message_Queue_NT;
/**
* @class ACE_Message_Queue
*
- * @brief A threaded message queueing facility, modeled after the
- * queueing facilities in System V STREAMs.
+ * @brief A message queueing facility with parameterized synchronization
+ * capability. ACE_Message_Queue is modeled after the queueing facilities
+ * in System V STREAMs.
+ *
+ * ACE_Message_Queue is the primary queueing facility for
+ * messages in the ACE framework. It's one template argument parameterizes
+ * the queue's synchronization. The argument specifies a synchronization
+ * strategy. The two main strategies available for ACE_SYNCH_DECL are:
+ * -# ACE_MT_SYNCH: all operations are thread-safe
+ * -# ACE_NULL_SYNCH: no synchronization and no locking overhead
*
- * An <ACE_Message_Queue> is the central queueing facility for
- * messages in the ACE framework. If <ACE_SYNCH_DECL> is
- * <ACE_MT_SYNCH> then all operations are thread-safe.
- * Otherwise, if it's <ACE_NULL_SYNCH> then there's no locking
- * overhead.
+ * All data passing through ACE_Message_Queue is in the form of
+ * ACE_Message_Block objects. @sa ACE_Message_Block.
*/
template <ACE_SYNCH_DECL>
class ACE_Message_Queue : public ACE_Message_Queue_Base
@@ -58,83 +63,91 @@ public:
REVERSE_ITERATOR;
// = Initialization and termination methods.
+ //@{
/**
- * Initialize an <ACE_Message_Queue>. The <high_water_mark>
- * determines how many bytes can be stored in a queue before it's
- * considered "full." Supplier threads must block until the queue
- * is no longer full. The <low_water_mark> determines how many
- * bytes must be in the queue before supplier threads are allowed to
- * enqueue additional <ACE_Message_Block>s. By default, the
- * <high_water_mark> equals the <low_water_mark>, which means that
- * suppliers will be able to enqueue new messages as soon as a
- * consumer removes any message from the queue. Making the
- * <low_water_mark> smaller than the <high_water_mark> forces
- * consumers to drain more messages from the queue before suppliers
- * can enqueue new messages, which can minimize the "silly window
- * syndrome."
- */
- ACE_Message_Queue (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
- size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
- ACE_Notification_Strategy * = 0);
-
- /**
- * Initialize an <ACE_Message_Queue>. The <high_water_mark>
- * determines how many bytes can be stored in a queue before it's
- * considered "full." Supplier threads must block until the queue
- * is no longer full. The <low_water_mark> determines how many
- * bytes must be in the queue before supplier threads are allowed to
- * enqueue additional <ACE_Message_Block>s. By default, the
- * <high_water_mark> equals the <low_water_mark>, which means that
- * suppliers will be able to enqueue new messages as soon as a
- * consumer removes any message from the queue. Making the
- * <low_water_mark> smaller than the <high_water_mark> forces
- * consumers to drain more messages from the queue before suppliers
- * can enqueue new messages, which can minimize the "silly window
- * syndrome."
+ * Initialize an ACE_Message_Queue.
+ *
+ * @param hwm High water mark. Determines how many bytes can be stored in a
+ * queue before it's considered full. Supplier threads must block
+ * until the queue is no longer full.
+ * @param lwm Low water mark. Determines how many bytes must be in the queue
+ * before supplier threads are allowed to enqueue additional
+ * data. By default, the @a hwm equals @a lwm, which means
+ * that suppliers will be able to enqueue new messages as soon as
+ * a consumer removes any message from the queue. Making the low
+ * water mark smaller than the high water mark forces consumers to
+ * drain more messages from the queue before suppliers can enqueue
+ * new messages, which can minimize the "silly window syndrome."
+ * @param ns Notification strategy. Pointer to an object conforming to the
+ * ACE_Notification_Strategy interface. If set, the object's
+ * notify(void) method will be called each time data is added to
+ * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy.
*/
+ ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
+ size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
+ ACE_Notification_Strategy *ns = 0);
virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
- ACE_Notification_Strategy * = 0);
+ ACE_Notification_Strategy *ns = 0);
+ //@}
- /// Release all resources from the message queue and mark it as deactivated.
- /// Returns the number of messages released from the queue.
+ /// Releases all resources from the message queue and marks it deactivated.
+ /// @sa flush().
+ ///
+ /// @retval The number of messages released from the queue; -1 on error.
virtual int close (void);
- /// Release all resources from the message queue and mark it as deactivated.
+ /// Releases all resources from the message queue and marks it deactivated.
virtual ~ACE_Message_Queue (void);
- /// Release all resources from the message queue but do not mark it
- /// as deactivated.
+ /// Releases all resources from the message queue but does not mark it
+ /// deactivated.
+ /// @sa close().
/**
* This method holds the queue lock during this operation.
*
- * @return The number of messages flushed.
+ * @return The number of messages flushed; -1 on error.
*/
virtual int flush (void);
/// Release all resources from the message queue but do not mark it
/// as deactivated.
/**
- * The caller must be holding the queue lock before calling this
+ * @pre The caller must be holding the queue lock before calling this
* method.
*
* @return The number of messages flushed.
*/
virtual int flush_i (void);
- // = Enqueue and dequeue methods.
-
- // For the following enqueue and dequeue methods if <timeout> == 0,
- // the caller will block until action is possible, else will wait
- // until the absolute time specified in *<timeout> elapses). These
- // calls will return, however, when queue is closed, deactivated,
- // when a signal occurs, or if the time specified in timeout
- // elapses, (in which case errno = EWOULDBLOCK).
-
+ /** @name Enqueue and dequeue methods
+ *
+ * The enqueue and dequeue methods accept a timeout value passed as
+ * an ACE_Time_Value *. In all cases, if the timeout pointer is 0,
+ * the caller will block until action is possible. If the timeout pointer
+ * is non-zero, the call will wait (if needed, subject to water mark
+ * settings) until the absolute time specified in the referenced
+ * ACE_Time_Value object is reached. If the time is reached before the
+ * desired action is possible, the method will return -1 with errno set
+ * to @c EWOULDBLOCK. Regardless of the timeout setting, however,
+ * these methods will also fail and return -1 when the queue is closed,
+ * deactivated, pulsed, or when a signal occurs.
+ *
+ * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
+ * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are
+ * affected by queue state transitions.
+ */
+ //@{
/**
* Retrieve a pointer to the first ACE_Message_Block in the queue
* without removing it.
*
+ * @note Because the block whose pointer is returned is still on the queue,
+ * another thread may dequeue the referenced block at any time,
+ * including before the calling thread examines the peeked-at block.
+ * Be very careful with this method in multithreaded queueing
+ * situations.
+ *
* @param first_item Reference to an ACE_Message_Block * that will
* point to the first block on the queue. The block
* remains on the queue until this or another thread
@@ -143,9 +156,9 @@ public:
* for a block to be queued.
*
* @retval >0 The number of ACE_Message_Blocks on the queue.
- * @retval -1 On failure. errno holds the reason. If EWOULDBLOCK,
- * the timeout elapsed. If ESHUTDOWN, the queue was
- * deactivated or pulsed.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
@@ -164,112 +177,161 @@ public:
*
* @retval >0 The number of ACE_Message_Blocks on the queue after adding
* the specified block.
- * @retval -1 On failure. errno holds the reason. If EWOULDBLOCK,
- * the timeout elapsed. If ESHUTDOWN, the queue was
- * deactivated or pulsed.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int enqueue_prio (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0);
/**
- * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
- * accordance with its <msg_deadline_time>. FIFO
- * order is maintained when messages of the same deadline time are
- * inserted consecutively. Note that <timeout> uses <{absolute}>
- * time rather than <{relative}> time. If the <timeout> elapses
- * without receiving a message -1 is returned and <errno> is set to
- * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and
- * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure,
- * else the number of items still on the queue.
+ * Enqueue an ACE_Message_Block into the queue in accordance with the
+ * block's deadline time. FIFO order is maintained when messages of
+ * the same deadline time are inserted consecutively.
+ *
+ * @param new_item Pointer to an ACE_Message_Block that will be
+ * added to the queue. The block's @c msg_deadline_time()
+ * method will be called to obtain the relative queueing
+ * position.
+ * @param timeout The absolute time the caller will wait until
+ * for the block to be queued.
+ *
+ * @retval >0 The number of ACE_Message_Blocks on the queue after adding
+ * the specified block.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int enqueue_deadline (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0);
/**
- * This is an alias for <enqueue_prio>. It's only here for
+ * @deprecated This is an alias for enqueue_prio(). It's only here for
* backwards compatibility and will go away in a subsequent release.
- * Please use <enqueue_prio> instead. Note that <timeout> uses
- * <{absolute}> time rather than <{relative}> time.
+ * Please use enqueue_prio() instead.
*/
virtual int enqueue (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0);
/**
- * Enqueue an <ACE_Message_Block *> at the end of the queue. Note
- * that <timeout> uses <{absolute}> time rather than <{relative}>
- * time. If the <timeout> elapses without receiving a message -1 is
- * returned and <errno> is set to <EWOULDBLOCK>. If the queue is
- * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
- * Otherwise, returns -1 on failure, else the number of items still
- * on the queue.
+ * Enqueue one or more ACE_Message_Block objects at the tail of the queue.
+ * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
+ * start of a series of ACE_Message_Block objects connected via their
+ * @c next() pointers. The series of blocks will be added to the queue in
+ * the same order they are passed in as.
+ *
+ * @param new_item Pointer to an ACE_Message_Block that will be
+ * added to the queue. If the block's @c next() pointer
+ * is non-zero, all blocks chained from the @c next()
+ * pointer are enqueued as well.
+ * @param timeout The absolute time the caller will wait until
+ * for the block to be queued.
+ *
+ * @retval >0 The number of ACE_Message_Blocks on the queue after adding
+ * the specified block(s).
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int enqueue_tail (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0);
/**
- * Enqueue an <ACE_Message_Block *> at the head of the queue. Note
- * that <timeout> uses <{absolute}> time rather than <{relative}>
- * time. If the <timeout> elapses without receiving a message -1 is
- * returned and <errno> is set to <EWOULDBLOCK>. If the queue is
- * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
- * Otherwise, returns -1 on failure, else the number of items still
- * on the queue.
+ * Enqueue one or more ACE_Message_Block objects at the head of the queue.
+ * If the @a new_item @c next() pointer is non-zero, it is assumed to be the
+ * start of a series of ACE_Message_Block objects connected via their
+ * @c next() pointers. The series of blocks will be added to the queue in
+ * the same order they are passed in as.
+ *
+ * @param new_item Pointer to an ACE_Message_Block that will be
+ * added to the queue. If the block's @c next() pointer
+ * is non-zero, all blocks chained from the @c next()
+ * pointer are enqueued as well.
+ * @param timeout The absolute time the caller will wait until
+ * for the block to be queued.
+ *
+ * @retval >0 The number of ACE_Message_Blocks on the queue after adding
+ * the specified block(s).
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int enqueue_head (ACE_Message_Block *new_item,
ACE_Time_Value *timeout = 0);
- /// This method is an alias for the following <dequeue_head> method.
+ /// This method is an alias for the dequeue_head() method.
virtual int dequeue (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
/**
- * Dequeue and return the <ACE_Message_Block *> at the head of the
- * queue. Note that <timeout> uses <{absolute}> time rather than
- * <{relative}> time. If the <timeout> elapses without receiving a
- * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If
- * the queue is deactivated -1 is returned and <errno> is set to
- * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number
- * of items still on the queue.
+ * Dequeue the ACE_Message_Block at the head of the queue and return
+ * a pointer to the dequeued block.
+ *
+ * @param first_item Reference to an ACE_Message_Block * that will
+ * be set to the address of the dequeued block.
+ * @param timeout The absolute time the caller will wait until
+ * for a block to be dequeued.
+ *
+ * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
/**
- * Dequeue and return earliest the <ACE_Message_Block *> that has
- * the lowest priority (i.e., preserves FIFO order for messages with
- * the same priority). Note that <timeout> uses <{absolute}> time
- * rather than <{relative}> time. If the <timeout> elapses without
- * receiving a message -1 is returned and <errno> is set to
- * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and
- * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure,
- * else the number of items still on the queue.
+ * Dequeue the ACE_Message_Block that has the lowest priority (preserves
+ * FIFO order for messages with the same priority) and return a pointer
+ * to the dequeued block.
+ *
+ * @param first_item Reference to an ACE_Message_Block * that will
+ * be set to the address of the dequeued block.
+ * @param timeout The absolute time the caller will wait until
+ * for a block to be dequeued.
+ *
+ * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int dequeue_prio (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
/**
- * Dequeue and return the <ACE_Message_Block *> at the tail of the
- * queue. Note that <timeout> uses <{absolute}> time rather than
- * <{relative}> time. If the <timeout> elapses without receiving a
- * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If
- * the queue is deactivated -1 is returned and <errno> is set to
- * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number
- * of items still on the queue.
+ * Dequeue the ACE_Message_Block at the tail of the queue and return
+ * a pointer to the dequeued block.
+ *
+ * @param dequeued Reference to an ACE_Message_Block * that will
+ * be set to the address of the dequeued block.
+ * @param timeout The absolute time the caller will wait until
+ * for a block to be dequeued.
+ *
+ * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int dequeue_tail (ACE_Message_Block *&dequeued,
ACE_Time_Value *timeout = 0);
/**
- * Dequeue and return the <ACE_Message_Block *> with the lowest
- * deadline time. Note that <timeout> uses <{absolute}> time rather than
- * <{relative}> time. If the <timeout> elapses without receiving a
- * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If
- * the queue is deactivated -1 is returned and <errno> is set to
- * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number
- * of items still on the queue.
+ * Dequeue the ACE_Message_Block with the earliest deadline time and return
+ * a pointer to the dequeued block.
+ *
+ * @param dequeued Reference to an ACE_Message_Block * that will
+ * be set to the address of the dequeued block.
+ * @param timeout The absolute time the caller will wait until
+ * for a block to be dequeued.
+ *
+ * @retval >=0 The number of ACE_Message_Blocks remaining in the queue.
+ * @retval -1 On failure. errno holds the reason. Common errno values are:
+ * - EWOULDBLOCK: the timeout elapsed
+ * - ESHUTDOWN: the queue was deactivated or pulsed
*/
virtual int dequeue_deadline (ACE_Message_Block *&dequeued,
ACE_Time_Value *timeout = 0);
+ //@}
// = Check if queue is full/empty.
/// True if queue is full, else false.
@@ -277,7 +339,10 @@ public:
/// True if queue is empty, else false.
virtual int is_empty (void);
- // = Queue statistic methods.
+ /** @name Queue statistics methods
+ */
+ //@{
+
/**
* Number of total bytes on the queue, i.e., sum of the message
* block sizes.
@@ -308,7 +373,12 @@ public:
*/
virtual void message_length (size_t new_length);
- // = Flow control methods.
+ //@}
+
+
+ /** @name Water mark (flow control) methods
+ */
+ //@{
/**
* Get high watermark.
@@ -330,8 +400,14 @@ public:
* additional <ACE_Message_Block>s.
*/
virtual void low_water_mark (size_t lwm);
+ //@}
- // = Activation control methods.
+ /** @name Activation and queue state methods
+ * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of
+ * queue states and transitions and how the transitions affect message
+ * enqueueing and dequeueing operations.
+ */
+ //@{
/**
* Deactivate the queue and wakeup all threads waiting on the queue
@@ -365,8 +441,11 @@ public:
/// Returns true if the state of the queue is <DEACTIVATED>,
/// but false if the queue's is <ACTIVATED> or <PULSED>.
virtual int deactivated (void);
+ //@}
- // = Notification hook.
+ /** @name Notification strategy methods
+ */
+ //@{
/**
* This hook is automatically invoked by <enqueue_head>,
@@ -385,6 +464,7 @@ public:
/// Set the notification strategy for the <Message_Queue>
virtual void notification_strategy (ACE_Notification_Strategy *s);
+ //@}
/// Returns a reference to the lock used by the <ACE_Message_Queue>.
virtual ACE_SYNCH_MUTEX_T &lock (void);
diff --git a/tests/Message_Queue_Test.cpp b/tests/Message_Queue_Test.cpp
index 62863c99d1e..fc2b16b0d43 100644
--- a/tests/Message_Queue_Test.cpp
+++ b/tests/Message_Queue_Test.cpp
@@ -183,6 +183,98 @@ iterator_test (void)
#if defined (ACE_HAS_THREADS)
static int
+chained_block_test (void)
+{
+
+ QUEUE q;
+ const char * s = "123456789"; // Will be length 10 when copied to block
+ const size_t slen = 10;
+ const size_t num_blks = 10;
+ ACE_Message_Block b[num_blks];
+ size_t i;
+ int status = 0;
+
+ for (i = 0; i < num_blks; ++i)
+ {
+ b[i].init (slen);
+ b[i].copy (s);
+ }
+
+ // Test enqueueing single and chained blocks and be sure they end up with
+ // the proper enqueued block count and sizes. Then be sure they are dequeued
+ // in the proper order.
+ b[0].next (&b[1]);
+ b[1].next (&b[2]);
+ // b[3] and b[4] are unchained.
+ b[5].next (&b[6]);
+ b[6].next (&b[7]);
+ b[7].next (&b[8]);
+ // b[9] is unchained
+ q.enqueue_tail (&b[3]);
+ q.enqueue_tail (&b[4]);
+ int num = q.enqueue_head (&b[0]);
+ if (num != 5)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 5; has %d\n"),
+ num));
+ status = -1;
+ }
+ num = q.enqueue_tail (&b[5]);
+ if (num != 9)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 9; has %d\n"),
+ num));
+ status = -1;
+ }
+ num = q.enqueue_tail (&b[9]);
+ if (num != 10)
+ {
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 10; has %d\n"),
+ num));
+ status = -1;
+ }
+ size_t msgs, bytes;
+ msgs = q.message_count ();
+ bytes = q.message_bytes ();
+ if (msgs != 10 || bytes != 100)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ")
+ ACE_TEXT ("should be 10 msgs, 100 bytes\n"),
+ (int)msgs, (int)bytes));
+ status = -1;
+ }
+
+ // Now see if we can dequeue them, checking the order.
+ ACE_Time_Value nowait (ACE_OS::gettimeofday ());
+ ACE_Message_Block *bp;
+ int qstat;
+ for (i = 0; i < num_blks; ++i)
+ {
+ qstat = q.dequeue_head (bp, &nowait);
+ if (qstat == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Checking chained blocks, pass %d: %p\n"),
+ (int)i, ACE_TEXT ("dequeue_head")));
+ status = -1;
+ }
+ else if (bp != &b[i])
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("Checking chained blocks, pass %d: ")
+ ACE_TEXT ("block out of order\n"),
+ (int)i));
+ status = -1;
+ }
+ }
+
+ if (status == 0)
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Chained block test OK\n")));
+ return status;
+}
+
+static int
single_thread_performance_test (int queue_type = 0)
{
const char test_message[] =
@@ -562,6 +654,9 @@ run_main (int argc, ACE_TCHAR *argv[])
status = timeout_test ();
if (status == 0)
+ status = chained_block_test ();
+
+ if (status == 0)
status = single_thread_performance_test ();
# if defined (VXWORKS) || (defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0))