diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2001-01-26 18:44:08 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2001-01-26 18:44:08 +0000 |
commit | 991986bf2f012206fe7e29f4d6600093141ca2e5 (patch) | |
tree | 9906d454611b0ddf2904537f5d6d5456218b2b1a /ace/Message_Queue_T.h | |
parent | 0e7e3bcf2c5abe39e892c578052a054cd87e4314 (diff) | |
download | ATCD-991986bf2f012206fe7e29f4d6600093141ca2e5.tar.gz |
ChangeLogTag:Fri Jan 26 11:18:15 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
Diffstat (limited to 'ace/Message_Queue_T.h')
-rw-r--r-- | ace/Message_Queue_T.h | 311 |
1 files changed, 302 insertions, 9 deletions
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index 7cc47832eed..b26a58c2b8d 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -36,7 +36,7 @@ class ACE_Message_Queue_NT; * queueing facilities in System V STREAMs. * * An <ACE_Message_Queue> is the central queueing facility for - * messages in the ASX framework. If <ACE_SYNCH_DECL> is + * messages in the ACE framework. If <ACE_SYNCH_DECL> is * <ACE_MT_SYNCH> then all operations are thread-safe. * Otherwise, if it's <ACE_NULL_SYNCH> then there's no locking * overhead. @@ -185,20 +185,24 @@ public: // = Check if queue is full/empty. /// True if queue is full, else false. - /// True if queue is empty, else false. virtual int is_full (void); + /// True if queue is empty, else false. virtual int is_empty (void); // = Queue statistic methods. /** * Number of total bytes on the queue, i.e., sum of the message * block sizes. + */ + virtual size_t message_bytes (void); + /** * Number of total length on the queue, i.e., sum of the message * block lengths. - * Number of total messages on the queue. */ - virtual size_t message_bytes (void); virtual size_t message_length (void); + /** + * Number of total messages on the queue. + */ virtual size_t message_count (void); // = Manual changes to these stats (used when queued message blocks @@ -206,29 +210,35 @@ public: /** * New value of the number of total bytes on the queue, i.e., sum of * the message block sizes. + */ + virtual void message_bytes (size_t new_size); + /** * New value of the number of total length on the queue, i.e., sum * of the message block lengths. */ - virtual void message_bytes (size_t new_size); virtual void message_length (size_t new_length); // = Flow control methods. /** * Get high watermark. + */ + virtual size_t high_water_mark (void); + /** * Set the high watermark, which determines how many bytes can be * stored in a queue before it's considered "full." */ - virtual size_t high_water_mark (void); virtual void high_water_mark (size_t hwm); /** * Get low watermark. + */ + virtual size_t low_water_mark (void); + /** * Set the low watermark, which determines how many bytes must be in * the queue before supplier threads are allowed to enqueue * additional <ACE_Message_Block>s. */ - virtual size_t low_water_mark (void); virtual void low_water_mark (size_t lwm); // = Activation control methods. @@ -275,10 +285,8 @@ public: /// Returns a reference to the lock used by the <ACE_Message_Queue>. ACE_SYNCH_MUTEX_T &lock (void) { - // // The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the // header file (j.russell.noseworthy@objectsciences.com) - // return this->lock_; } @@ -741,6 +749,291 @@ public: #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ }; +/** + * @class ACE_Message_Queue_Ex + * + * @brief A threaded message queueing facility, modeled after the + * queueing facilities in System V STREAMs. + * + * An <ACE_Message_Queue_Ex> is the templatized wrapper of the central + * queueing facility for messages in the ACE framework. If + * <ACE_SYNCH_DECL> is <ACE_MT_SYNCH> then all operations are + * thread-safe. Otherwise, if it's <ACE_NULL_SYNCH> then there's no + * locking overhead. + */ +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> +class ACE_Message_Queue_Ex +{ +public: + + // = Defualt priority value. + enum + { + DEFUALT_PRIORITY = 0 + }; + +#if 0 + // @@ Iterators are not implemented yet... + + friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; + friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; + + // = Traits + typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> + ITERATOR; + typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> + REVERSE_ITERATOR; +#endif /* 0 */ + + // = Initialization and termination methods. + + /** + * Initialize an <ACE_Message_Queue>. The <high_water_mark> + * determines how many bytes can be stored in a queue before it's + * considered "full." Supplier threads must block until the queue + * is no longer full. The <low_water_mark> determines how many + * bytes must be in the queue before supplier threads are allowed to + * enqueue additional <ACE_Message_Block>s. By default, the + * <high_water_mark> equals the <low_water_mark>, which means that + * suppliers will be able to enqueue new messages as soon as a + * consumer removes any message from the queue. Making the + * <low_water_mark> smaller than the <high_water_mark> forces + * consumers to drain more messages from the queue before suppliers + * can enqueue new messages, which can minimize the "silly window + * syndrome." + */ + ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM, + size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + + /** + * Initialize an <ACE_Message_Queue>. The <high_water_mark> + * determines how many bytes can be stored in a queue before it's + * considered "full." Supplier threads must block until the queue + * is no longer full. The <low_water_mark> determines how many + * bytes must be in the queue before supplier threads are allowed to + * enqueue additional <ACE_Message_Block>s. By default, the + * <high_water_mark> equals the <low_water_mark>, which means that + * suppliers will be able to enqueue new messages as soon as a + * consumer removes any message from the queue. Making the + * <low_water_mark> smaller than the <high_water_mark> forces + * consumers to drain more messages from the queue before suppliers + * can enqueue new messages, which can minimize the "silly window + * syndrome." + */ + int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, + size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + + /// Close down the message queue and release all resources. + int close (void); + + /// Close down the message queue and release all resources. + ~ACE_Message_Queue_Ex (void); + + // = Enqueue and dequeue methods. + + // For the following enqueue and dequeue methods if <timeout> == 0, + // the caller will block until action is possible, else will wait + // until the absolute time specified in *<timeout> elapses). These + // calls will return, however, when queue is closed, deactivated, + // when a signal occurs, or if the time specified in timeout + // elapses, (in which case errno = EWOULDBLOCK). + + /** + * Retrieve the first <ACE_Message_Block> without removing it. Note + * that <timeout> uses <{absolute}> time rather than <{relative}> + * time. If the <timeout> elapses without receiving a message -1 is + * returned and <errno> is set to <EWOULDBLOCK>. If the queue is + * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. + * Otherwise, returns -1 on failure, else the number of items still + * on the queue. + */ + int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item, + ACE_Time_Value *timeout = 0); + + /** + * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in + * accordance with its <msg_priority> (0 is lowest priority). FIFO + * order is maintained when messages of the same priority are + * inserted consecutively. Note that <timeout> uses <{absolute}> + * time rather than <{relative}> time. If the <timeout> elapses + * without receiving a message -1 is returned and <errno> is set to + * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and + * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure, + * else the number of items still on the queue. + */ + int enqueue_prio (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout = 0); + + /** + * This is an alias for <enqueue_prio>. It's only here for + * backwards compatibility and will go away in a subsequent release. + * Please use <enqueue_prio> instead. Note that <timeout> uses + * <{absolute}> time rather than <{relative}> time. + */ + int enqueue (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout = 0); + + /** + * Enqueue an <ACE_Message_Block *> at the end of the queue. Note + * that <timeout> uses <{absolute}> time rather than <{relative}> + * time. If the <timeout> elapses without receiving a message -1 is + * returned and <errno> is set to <EWOULDBLOCK>. If the queue is + * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. + * Otherwise, returns -1 on failure, else the number of items still + * on the queue. + */ + int enqueue_tail (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout = 0); + + /** + * Enqueue an <ACE_Message_Block *> at the head of the queue. Note + * that <timeout> uses <{absolute}> time rather than <{relative}> + * time. If the <timeout> elapses without receiving a message -1 is + * returned and <errno> is set to <EWOULDBLOCK>. If the queue is + * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>. + * Otherwise, returns -1 on failure, else the number of items still + * on the queue. + */ + int enqueue_head (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout = 0); + + /// This method is an alias for the following <dequeue_head> method. + int dequeue (ACE_MESSAGE_TYPE *&first_item, + ACE_Time_Value *timeout = 0); + // This method is an alias for the following <dequeue_head> method. + + /** + * Dequeue and return the <ACE_Message_Block *> at the head of the + * queue. Note that <timeout> uses <{absolute}> time rather than + * <{relative}> time. If the <timeout> elapses without receiving a + * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If + * the queue is deactivated -1 is returned and <errno> is set to + * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number + * of items still on the queue. + */ + int dequeue_head (ACE_MESSAGE_TYPE *&first_item, + ACE_Time_Value *timeout = 0); + + // = Check if queue is full/empty. + /// True if queue is full, else false. + int is_full (void); + /// True if queue is empty, else false. + int is_empty (void); + + + // = Queue statistic methods. + /** + * Number of total bytes on the queue, i.e., sum of the message + * block sizes. + */ + size_t message_bytes (void); + /** + * Number of total length on the queue, i.e., sum of the message + * block lengths. + */ + size_t message_length (void); + /** + * Number of total messages on the queue. + */ + size_t message_count (void); + + // = Manual changes to these stats (used when queued message blocks + // change size or lengths). + /** + * New value of the number of total bytes on the queue, i.e., sum of + * the message block sizes. + */ + virtual void message_bytes (size_t new_size); + /** + * New value of the number of total length on the queue, i.e., sum + * of the message block lengths. + */ + virtual void message_length (size_t new_length); + + // = Flow control methods. + /** + * Get high watermark. + */ + virtual size_t high_water_mark (void); + /** + * Set the high watermark, which determines how many bytes can be + * stored in a queue before it's considered "full." + */ + virtual void high_water_mark (size_t hwm); + + /** + * Get low watermark. + */ + virtual size_t low_water_mark (void); + /** + * Set the low watermark, which determines how many bytes must be in + * the queue before supplier threads are allowed to enqueue + * additional <ACE_Message_Block>s. + */ + virtual void low_water_mark (size_t lwm); + + // = Activation control methods. + + /** + * Deactivate the queue and wakeup all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1 with <errno> == + * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + * call and WAS_ACTIVE if queue was active before the call. + */ + int deactivate (void); + + /** + * Reactivate the queue so that threads can enqueue and dequeue + * messages again. Returns WAS_INACTIVE if queue was inactive + * before the call and WAS_ACTIVE if queue was active before the + * call. + */ + int activate (void); + + /// Returns true if <deactivated_> is enabled. + int deactivated (void); + + // = Notification hook. + + /** + * This hook is automatically invoked by <enqueue_head>, + * <enqueue_tail>, and <enqueue_prio> when a new item is inserted + * into the queue. Subclasses can override this method to perform + * specific notification strategies (e.g., signaling events for a + * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a + * multi-threaded application with concurrent consumers, there is no + * guarantee that the queue will be still be non-empty by the time + * the notification occurs. + */ + int notify (void); + + /// Get/set the notification strategy for the <Message_Queue> + ACE_Notification_Strategy *notification_strategy (void); + void notification_strategy (ACE_Notification_Strategy *s); + + /// Returns a reference to the lock used by the <ACE_Message_Queue_Ex>. + ACE_SYNCH_MUTEX_T &lock (void) + { + // The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the + // header file (j.russell.noseworthy@objectsciences.com) + return this->lock_; + } + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +private: + /// Implement this via an <ACE_Message_Queue>. + ACE_Message_Queue<ACE_SYNCH> *queue_; +}; + #if defined (__ACE_INLINE__) #include "ace/Message_Queue_T.i" #endif /* __ACE_INLINE__ */ |