diff options
author | Steve Huston <shuston@riverace.com> | 2006-02-04 12:22:28 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2006-02-04 12:22:28 +0000 |
commit | fdcff80a3b72f1a23eac7f3bbdfac45e2d9f0d0a (patch) | |
tree | cac5a5c70b0014e7ae2de8476ad8dda0e9a2132a | |
parent | 925d467ec553414c7a8cc19741681408898ca1b4 (diff) | |
download | ATCD-fdcff80a3b72f1a23eac7f3bbdfac45e2d9f0d0a.tar.gz |
ChangeLogTag:Fri Feb 3 23:48:32 UTC 2006 Steve Huston <shuston@riverace.com>
-rw-r--r-- | ChangeLog | 17 | ||||
-rw-r--r-- | ace/Barrier.h | 2 | ||||
-rw-r--r-- | ace/Message_Queue_T.cpp | 74 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 324 | ||||
-rw-r--r-- | tests/Message_Queue_Test.cpp | 95 |
5 files changed, 372 insertions, 140 deletions
diff --git a/ChangeLog b/ChangeLog index fe0763eb68c..7e2b5a30520 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,20 @@ +Fri Feb 3 23:48:32 UTC 2006 Steve Huston <shuston@riverace.com> + + * ace/Barrier.h: Noted shutdown() was added for 5.4.9. + + * ace/Message_Queue_T.{h cpp}: Changed enqueue_head(), enqueue_tail() + to recognize that the ACE_Message_Block passed may have other + block(s) connected to it via the next() pointers. This allows a + caller to pre-connect a series of ACE_Message_Blocks and coalesce + the enqueueing of the series into a single method call. + Thanks to Guy Peleg <guy dot peleg at amdocs dot com> for suggesting + this enhancement. + Also revamped the Doxygenization of ACE_Message_Queue's + documentation. + + * tests/Message_Queue_Test.cpp: Added chained_block_test() to test + the new functionality above. + Fri Feb 3 14:47:53 UTC 2006 Ossama Othman <ossama_othman at symantec dot com> * ace/Cleanup_Strategies_T.h: diff --git a/ace/Barrier.h b/ace/Barrier.h index 84d97e836b9..7939aed1d24 100644 --- a/ace/Barrier.h +++ b/ace/Barrier.h @@ -116,6 +116,8 @@ public: /// value -1, errno ESHUTDOWN. /// /// @retval 0 for success, -1 if already shut down. + /// + /// @since ACE beta 5.4.9. int shutdown (void); /// Dump the state of an object. diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index 595660a11a5..0ece6377375 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -977,35 +977,48 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item) if (new_item == 0) return -1; + // Update the queued size and length, taking into account any chained + // blocks (total_size_and_length() counts all continuation blocks). + // Keep count of how many blocks we're adding and, if there is a chain of + // blocks, find the end in seq_tail and be sure they're properly + // back-connected along the way. + ACE_Message_Block *seq_tail = new_item; + ++this->cur_count_; + new_item->total_size_and_length (this->cur_bytes_, + this->cur_length_); + while (seq_tail->next () != 0) + { + seq_tail->next ()->prev (seq_tail); + seq_tail = seq_tail->next (); + ++this->cur_count_; + seq_tail->total_size_and_length (this->cur_bytes_, + this->cur_length_); + } + // List was empty, so build a new one. if (this->tail_ == 0) { this->head_ = new_item; - this->tail_ = new_item; - new_item->next (0); + this->tail_ = seq_tail; + // seq_tail->next (0); This is a condition of the while() loop above. new_item->prev (0); } // Link at the end. else { - new_item->next (0); + // seq_tail->next (0); This is a condition of the while() loop above. this->tail_->next (new_item); new_item->prev (this->tail_); - this->tail_ = new_item; + this->tail_ = seq_tail; } - // Make sure to count all the bytes in a composite message!!! - new_item->total_size_and_length (this->cur_bytes_, - this->cur_length_); - ++this->cur_count_; - if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } -// Actually put the node at the head (no locking) +// Actually put the node(s) at the head (no locking) template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item) @@ -1015,21 +1028,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item) if (new_item == 0) return -1; + // Update the queued size and length, taking into account any chained + // blocks (total_size_and_length() counts all continuation blocks). + // Keep count of how many blocks we're adding and, if there is a chain of + // blocks, find the end in seq_tail and be sure they're properly + // back-connected along the way. + ACE_Message_Block *seq_tail = new_item; + ++this->cur_count_; + new_item->total_size_and_length (this->cur_bytes_, + this->cur_length_); + while (seq_tail->next () != 0) + { + seq_tail->next ()->prev (seq_tail); + seq_tail = seq_tail->next (); + ++this->cur_count_; + seq_tail->total_size_and_length (this->cur_bytes_, + this->cur_length_); + } + new_item->prev (0); - new_item->next (this->head_); + seq_tail->next (this->head_); if (this->head_ != 0) - this->head_->prev (new_item); + this->head_->prev (seq_tail); else - this->tail_ = new_item; + this->tail_ = seq_tail; this->head_ = new_item; - // Make sure to count all the bytes in a composite message!!! - new_item->total_size_and_length (this->cur_bytes_, - this->cur_length_); - ++this->cur_count_; - if (this->signal_dequeue_waiters () == -1) return -1; else @@ -1047,6 +1073,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) if (new_item == 0) return -1; + // Since this method uses enqueue_head_i() and enqueue_tail_i() for + // special situations, and this method doesn't support enqueueing + // chains of blocks off the 'next' pointer, make sure the new_item's + // next pointer is 0. + new_item->next (0); + if (this->head_ == 0) // Check for simple case of an empty queue, where all we need to // do is insert <new_item> into the head. @@ -1113,6 +1145,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_ite if (new_item == 0) return -1; + // Since this method uses enqueue_head_i() and enqueue_tail_i() for + // special situations, and this method doesn't support enqueueing + // chains of blocks off the 'next' pointer, make sure the new_item's + // next pointer is 0. + new_item->next (0); + if (this->head_ == 0) // Check for simple case of an empty queue, where all we need to // do is insert <new_item> into the head. diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index 214770b60f6..c77ecaffceb 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -35,14 +35,19 @@ class ACE_Message_Queue_NT; /** * @class ACE_Message_Queue * - * @brief A threaded message queueing facility, modeled after the - * queueing facilities in System V STREAMs. + * @brief A message queueing facility with parameterized synchronization + * capability. ACE_Message_Queue is modeled after the queueing facilities + * in System V STREAMs. + * + * ACE_Message_Queue is the primary queueing facility for + * messages in the ACE framework. It's one template argument parameterizes + * the queue's synchronization. The argument specifies a synchronization + * strategy. The two main strategies available for ACE_SYNCH_DECL are: + * -# ACE_MT_SYNCH: all operations are thread-safe + * -# ACE_NULL_SYNCH: no synchronization and no locking overhead * - * An <ACE_Message_Queue> is the central queueing facility for - * messages in the ACE framework. If <ACE_SYNCH_DECL> is - * <ACE_MT_SYNCH> then all operations are thread-safe. - * Otherwise, if it's <ACE_NULL_SYNCH> then there's no locking - * overhead. + * All data passing through ACE_Message_Queue is in the form of + * ACE_Message_Block objects. @sa ACE_Message_Block. */ template <ACE_SYNCH_DECL> class ACE_Message_Queue : public ACE_Message_Queue_Base @@ -58,83 +63,91 @@ public: REVERSE_ITERATOR; // = Initialization and termination methods. + //@{ /** - * Initialize an <ACE_Message_Queue>. The <high_water_mark> - * determines how many bytes can be stored in a queue before it's - * considered "full." Supplier threads must block until the queue - * is no longer full. The <low_water_mark> determines how many - * bytes must be in the queue before supplier threads are allowed to - * enqueue additional <ACE_Message_Block>s. By default, the - * <high_water_mark> equals the <low_water_mark>, which means that - * suppliers will be able to enqueue new messages as soon as a - * consumer removes any message from the queue. Making the - * <low_water_mark> smaller than the <high_water_mark> forces - * consumers to drain more messages from the queue before suppliers - * can enqueue new messages, which can minimize the "silly window - * syndrome." - */ - ACE_Message_Queue (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, - size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, - ACE_Notification_Strategy * = 0); - - /** - * Initialize an <ACE_Message_Queue>. The <high_water_mark> - * determines how many bytes can be stored in a queue before it's - * considered "full." Supplier threads must block until the queue - * is no longer full. The <low_water_mark> determines how many - * bytes must be in the queue before supplier threads are allowed to - * enqueue additional <ACE_Message_Block>s. By default, the - * <high_water_mark> equals the <low_water_mark>, which means that - * suppliers will be able to enqueue new messages as soon as a - * consumer removes any message from the queue. Making the - * <low_water_mark> smaller than the <high_water_mark> forces - * consumers to drain more messages from the queue before suppliers - * can enqueue new messages, which can minimize the "silly window - * syndrome." + * Initialize an ACE_Message_Queue. + * + * @param hwm High water mark. Determines how many bytes can be stored in a + * queue before it's considered full. Supplier threads must block + * until the queue is no longer full. + * @param lwm Low water mark. Determines how many bytes must be in the queue + * before supplier threads are allowed to enqueue additional + * data. By default, the @a hwm equals @a lwm, which means + * that suppliers will be able to enqueue new messages as soon as + * a consumer removes any message from the queue. Making the low + * water mark smaller than the high water mark forces consumers to + * drain more messages from the queue before suppliers can enqueue + * new messages, which can minimize the "silly window syndrome." + * @param ns Notification strategy. Pointer to an object conforming to the + * ACE_Notification_Strategy interface. If set, the object's + * notify(void) method will be called each time data is added to + * this ACE_Message_Queue. @see ACE_Reactor_Notification_Strategy. */ + ACE_Message_Queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, + size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, + ACE_Notification_Strategy *ns = 0); virtual int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, - ACE_Notification_Strategy * = 0); + ACE_Notification_Strategy *ns = 0); + //@} - /// Release all resources from the message queue and mark it as deactivated. - /// Returns the number of messages released from the queue. + /// Releases all resources from the message queue and marks it deactivated. + /// @sa flush(). + /// + /// @retval The number of messages released from the queue; -1 on error. virtual int close (void); - /// Release all resources from the message queue and mark it as deactivated. + /// Releases all resources from the message queue and marks it deactivated. virtual ~ACE_Message_Queue (void); - /// Release all resources from the message queue but do not mark it - /// as deactivated. + /// Releases all resources from the message queue but does not mark it + /// deactivated. + /// @sa close(). /** * This method holds the queue lock during this operation. * - * @return The number of messages flushed. + * @return The number of messages flushed; -1 on error. */ virtual int flush (void); /// Release all resources from the message queue but do not mark it /// as deactivated. /** - * The caller must be holding the queue lock before calling this + * @pre The caller must be holding the queue lock before calling this * method. * * @return The number of messages flushed. */ virtual int flush_i (void); - // = Enqueue and dequeue methods. - - // For the following enqueue and dequeue methods if <timeout> == 0, - // the caller will block until action is possible, else will wait - // until the absolute time specified in *<timeout> elapses). These - // calls will return, however, when queue is closed, deactivated, - // when a signal occurs, or if the time specified in timeout - // elapses, (in which case errno = EWOULDBLOCK). - + /** @name Enqueue and dequeue methods + * + * The enqueue and dequeue methods accept a timeout value passed as + * an ACE_Time_Value *. In all cases, if the timeout pointer is 0, + * the caller will block until action is possible. If the timeout pointer + * is non-zero, the call will wait (if needed, subject to water mark + * settings) until the absolute time specified in the referenced + * ACE_Time_Value object is reached. If the time is reached before the + * desired action is possible, the method will return -1 with errno set + * to @c EWOULDBLOCK. Regardless of the timeout setting, however, + * these methods will also fail and return -1 when the queue is closed, + * deactivated, pulsed, or when a signal occurs. + * + * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of + * ACE_Message_Queue, enqueueing, dequeueing, and how these operations are + * affected by queue state transitions. + */ + //@{ /** * Retrieve a pointer to the first ACE_Message_Block in the queue * without removing it. * + * @note Because the block whose pointer is returned is still on the queue, + * another thread may dequeue the referenced block at any time, + * including before the calling thread examines the peeked-at block. + * Be very careful with this method in multithreaded queueing + * situations. + * * @param first_item Reference to an ACE_Message_Block * that will * point to the first block on the queue. The block * remains on the queue until this or another thread @@ -143,9 +156,9 @@ public: * for a block to be queued. * * @retval >0 The number of ACE_Message_Blocks on the queue. - * @retval -1 On failure. errno holds the reason. If EWOULDBLOCK, - * the timeout elapsed. If ESHUTDOWN, the queue was - * deactivated or pulsed. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); @@ -164,112 +177,161 @@ public: * * @retval >0 The number of ACE_Message_Blocks on the queue after adding * the specified block. - * @retval -1 On failure. errno holds the reason. If EWOULDBLOCK, - * the timeout elapsed. If ESHUTDOWN, the queue was - * deactivated or pulsed. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int enqueue_prio (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0); /** - * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in - * accordance with its <msg_deadline_time>. FIFO - * order is maintained when messages of the same deadline time are - * inserted consecutively. Note that <timeout> uses <{absolute}> - * time rather than <{relative}> time. If the <timeout> elapses - * without receiving a message -1 is returned and <errno> is set to - * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and - * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, - * else the number of items still on the queue. + * Enqueue an ACE_Message_Block into the queue in accordance with the + * block's deadline time. FIFO order is maintained when messages of + * the same deadline time are inserted consecutively. + * + * @param new_item Pointer to an ACE_Message_Block that will be + * added to the queue. The block's @c msg_deadline_time() + * method will be called to obtain the relative queueing + * position. + * @param timeout The absolute time the caller will wait until + * for the block to be queued. + * + * @retval >0 The number of ACE_Message_Blocks on the queue after adding + * the specified block. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int enqueue_deadline (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0); /** - * This is an alias for <enqueue_prio>. It's only here for + * @deprecated This is an alias for enqueue_prio(). It's only here for * backwards compatibility and will go away in a subsequent release. - * Please use <enqueue_prio> instead. Note that <timeout> uses - * <{absolute}> time rather than <{relative}> time. + * Please use enqueue_prio() instead. */ virtual int enqueue (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0); /** - * Enqueue an <ACE_Message_Block *> at the end of the queue. Note - * that <timeout> uses <{absolute}> time rather than <{relative}> - * time. If the <timeout> elapses without receiving a message -1 is - * returned and <errno> is set to <EWOULDBLOCK>. If the queue is - * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. - * Otherwise, returns -1 on failure, else the number of items still - * on the queue. + * Enqueue one or more ACE_Message_Block objects at the tail of the queue. + * If the @a new_item @c next() pointer is non-zero, it is assumed to be the + * start of a series of ACE_Message_Block objects connected via their + * @c next() pointers. The series of blocks will be added to the queue in + * the same order they are passed in as. + * + * @param new_item Pointer to an ACE_Message_Block that will be + * added to the queue. If the block's @c next() pointer + * is non-zero, all blocks chained from the @c next() + * pointer are enqueued as well. + * @param timeout The absolute time the caller will wait until + * for the block to be queued. + * + * @retval >0 The number of ACE_Message_Blocks on the queue after adding + * the specified block(s). + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0); /** - * Enqueue an <ACE_Message_Block *> at the head of the queue. Note - * that <timeout> uses <{absolute}> time rather than <{relative}> - * time. If the <timeout> elapses without receiving a message -1 is - * returned and <errno> is set to <EWOULDBLOCK>. If the queue is - * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. - * Otherwise, returns -1 on failure, else the number of items still - * on the queue. + * Enqueue one or more ACE_Message_Block objects at the head of the queue. + * If the @a new_item @c next() pointer is non-zero, it is assumed to be the + * start of a series of ACE_Message_Block objects connected via their + * @c next() pointers. The series of blocks will be added to the queue in + * the same order they are passed in as. + * + * @param new_item Pointer to an ACE_Message_Block that will be + * added to the queue. If the block's @c next() pointer + * is non-zero, all blocks chained from the @c next() + * pointer are enqueued as well. + * @param timeout The absolute time the caller will wait until + * for the block to be queued. + * + * @retval >0 The number of ACE_Message_Blocks on the queue after adding + * the specified block(s). + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout = 0); - /// This method is an alias for the following <dequeue_head> method. + /// This method is an alias for the dequeue_head() method. virtual int dequeue (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); /** - * Dequeue and return the <ACE_Message_Block *> at the head of the - * queue. Note that <timeout> uses <{absolute}> time rather than - * <{relative}> time. If the <timeout> elapses without receiving a - * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If - * the queue is deactivated -1 is returned and <errno> is set to - * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number - * of items still on the queue. + * Dequeue the ACE_Message_Block at the head of the queue and return + * a pointer to the dequeued block. + * + * @param first_item Reference to an ACE_Message_Block * that will + * be set to the address of the dequeued block. + * @param timeout The absolute time the caller will wait until + * for a block to be dequeued. + * + * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); /** - * Dequeue and return earliest the <ACE_Message_Block *> that has - * the lowest priority (i.e., preserves FIFO order for messages with - * the same priority). Note that <timeout> uses <{absolute}> time - * rather than <{relative}> time. If the <timeout> elapses without - * receiving a message -1 is returned and <errno> is set to - * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and - * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, - * else the number of items still on the queue. + * Dequeue the ACE_Message_Block that has the lowest priority (preserves + * FIFO order for messages with the same priority) and return a pointer + * to the dequeued block. + * + * @param first_item Reference to an ACE_Message_Block * that will + * be set to the address of the dequeued block. + * @param timeout The absolute time the caller will wait until + * for a block to be dequeued. + * + * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int dequeue_prio (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); /** - * Dequeue and return the <ACE_Message_Block *> at the tail of the - * queue. Note that <timeout> uses <{absolute}> time rather than - * <{relative}> time. If the <timeout> elapses without receiving a - * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If - * the queue is deactivated -1 is returned and <errno> is set to - * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number - * of items still on the queue. + * Dequeue the ACE_Message_Block at the tail of the queue and return + * a pointer to the dequeued block. + * + * @param dequeued Reference to an ACE_Message_Block * that will + * be set to the address of the dequeued block. + * @param timeout The absolute time the caller will wait until + * for a block to be dequeued. + * + * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int dequeue_tail (ACE_Message_Block *&dequeued, ACE_Time_Value *timeout = 0); /** - * Dequeue and return the <ACE_Message_Block *> with the lowest - * deadline time. Note that <timeout> uses <{absolute}> time rather than - * <{relative}> time. If the <timeout> elapses without receiving a - * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If - * the queue is deactivated -1 is returned and <errno> is set to - * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number - * of items still on the queue. + * Dequeue the ACE_Message_Block with the earliest deadline time and return + * a pointer to the dequeued block. + * + * @param dequeued Reference to an ACE_Message_Block * that will + * be set to the address of the dequeued block. + * @param timeout The absolute time the caller will wait until + * for a block to be dequeued. + * + * @retval >=0 The number of ACE_Message_Blocks remaining in the queue. + * @retval -1 On failure. errno holds the reason. Common errno values are: + * - EWOULDBLOCK: the timeout elapsed + * - ESHUTDOWN: the queue was deactivated or pulsed */ virtual int dequeue_deadline (ACE_Message_Block *&dequeued, ACE_Time_Value *timeout = 0); + //@} // = Check if queue is full/empty. /// True if queue is full, else false. @@ -277,7 +339,10 @@ public: /// True if queue is empty, else false. virtual int is_empty (void); - // = Queue statistic methods. + /** @name Queue statistics methods + */ + //@{ + /** * Number of total bytes on the queue, i.e., sum of the message * block sizes. @@ -308,7 +373,12 @@ public: */ virtual void message_length (size_t new_length); - // = Flow control methods. + //@} + + + /** @name Water mark (flow control) methods + */ + //@{ /** * Get high watermark. @@ -330,8 +400,14 @@ public: * additional <ACE_Message_Block>s. */ virtual void low_water_mark (size_t lwm); + //@} - // = Activation control methods. + /** @name Activation and queue state methods + * See C++NPv2 Section 6.2 and APG Section 12.3 for a fuller treatment of + * queue states and transitions and how the transitions affect message + * enqueueing and dequeueing operations. + */ + //@{ /** * Deactivate the queue and wakeup all threads waiting on the queue @@ -365,8 +441,11 @@ public: /// Returns true if the state of the queue is <DEACTIVATED>, /// but false if the queue's is <ACTIVATED> or <PULSED>. virtual int deactivated (void); + //@} - // = Notification hook. + /** @name Notification strategy methods + */ + //@{ /** * This hook is automatically invoked by <enqueue_head>, @@ -385,6 +464,7 @@ public: /// Set the notification strategy for the <Message_Queue> virtual void notification_strategy (ACE_Notification_Strategy *s); + //@} /// Returns a reference to the lock used by the <ACE_Message_Queue>. virtual ACE_SYNCH_MUTEX_T &lock (void); diff --git a/tests/Message_Queue_Test.cpp b/tests/Message_Queue_Test.cpp index 62863c99d1e..fc2b16b0d43 100644 --- a/tests/Message_Queue_Test.cpp +++ b/tests/Message_Queue_Test.cpp @@ -183,6 +183,98 @@ iterator_test (void) #if defined (ACE_HAS_THREADS) static int +chained_block_test (void) +{ + + QUEUE q; + const char * s = "123456789"; // Will be length 10 when copied to block + const size_t slen = 10; + const size_t num_blks = 10; + ACE_Message_Block b[num_blks]; + size_t i; + int status = 0; + + for (i = 0; i < num_blks; ++i) + { + b[i].init (slen); + b[i].copy (s); + } + + // Test enqueueing single and chained blocks and be sure they end up with + // the proper enqueued block count and sizes. Then be sure they are dequeued + // in the proper order. + b[0].next (&b[1]); + b[1].next (&b[2]); + // b[3] and b[4] are unchained. + b[5].next (&b[6]); + b[6].next (&b[7]); + b[7].next (&b[8]); + // b[9] is unchained + q.enqueue_tail (&b[3]); + q.enqueue_tail (&b[4]); + int num = q.enqueue_head (&b[0]); + if (num != 5) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 5; has %d\n"), + num)); + status = -1; + } + num = q.enqueue_tail (&b[5]); + if (num != 9) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 9; has %d\n"), + num)); + status = -1; + } + num = q.enqueue_tail (&b[9]); + if (num != 10) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Chained enqueue expected 10; has %d\n"), + num)); + status = -1; + } + size_t msgs, bytes; + msgs = q.message_count (); + bytes = q.message_bytes (); + if (msgs != 10 || bytes != 100) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Chained enqueue totals: %d msgs, %d bytes; ") + ACE_TEXT ("should be 10 msgs, 100 bytes\n"), + (int)msgs, (int)bytes)); + status = -1; + } + + // Now see if we can dequeue them, checking the order. + ACE_Time_Value nowait (ACE_OS::gettimeofday ()); + ACE_Message_Block *bp; + int qstat; + for (i = 0; i < num_blks; ++i) + { + qstat = q.dequeue_head (bp, &nowait); + if (qstat == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Checking chained blocks, pass %d: %p\n"), + (int)i, ACE_TEXT ("dequeue_head"))); + status = -1; + } + else if (bp != &b[i]) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Checking chained blocks, pass %d: ") + ACE_TEXT ("block out of order\n"), + (int)i)); + status = -1; + } + } + + if (status == 0) + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Chained block test OK\n"))); + return status; +} + +static int single_thread_performance_test (int queue_type = 0) { const char test_message[] = @@ -562,6 +654,9 @@ run_main (int argc, ACE_TCHAR *argv[]) status = timeout_test (); if (status == 0) + status = chained_block_test (); + + if (status == 0) status = single_thread_performance_test (); # if defined (VXWORKS) || (defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)) |