diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-08-05 01:05:49 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-08-05 01:05:49 +0000 |
commit | 8bc2711f7ffea172ec3874ee3ec7d34c22c29c3e (patch) | |
tree | 93989fa7f1a0983b2322dc24a8ebc994de4f9f86 /ace/Message_Queue.cpp | |
parent | ea40de67ec23f87125db282675be26fad6489a20 (diff) | |
download | ATCD-8bc2711f7ffea172ec3874ee3ec7d34c22c29c3e.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r-- | ace/Message_Queue.cpp | 324 |
1 files changed, 196 insertions, 128 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp index 436fa7f7c49..d5a90eb3046 100644 --- a/ace/Message_Queue.cpp +++ b/ace/Message_Queue.cpp @@ -107,33 +107,40 @@ ACE_Message_Queue<ACE_SYNCH_2>::dump (void) const ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, - "deactivated = %d\n" - "low_water_mark = %d\n" - "high_water_mark = %d\n" - "cur_bytes = %d\n" - "cur_count = %d\n", - "head_ = %u\n", - "tail_ = %u\n", - this->deactivated_, - this->low_water_mark_, - this->high_water_mark_, - this->cur_bytes_, - this->cur_count_, - this->head_, - this->tail_)); + "deactivated = %d\n" + "low_water_mark = %d\n" + "high_water_mark = %d\n" + "cur_bytes = %d\n" + "cur_count = %d\n", + "head_ = %u\n", + "tail_ = %u\n", + this->deactivated_, + this->low_water_mark_, + this->high_water_mark_, + this->cur_bytes_, + this->cur_count_, + this->head_, + this->tail_)); ACE_DEBUG ((LM_DEBUG,"notfull_cond: \n")); - notfull_cond_.dump(); + notfull_cond_.dump (); ACE_DEBUG ((LM_DEBUG,"notempty_cond: \n")); - notempty_cond_.dump(); + notempty_cond_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } template <ACE_SYNCH_1> ACE_Message_Queue<ACE_SYNCH_2>::ACE_Message_Queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns) + size_t lwm, + ACE_Notification_Strategy *ns) +#if defined (ACE_LACKS_COND_T) + : not_empty_cond_ (0), + not_full_cond_ (0), + enqueue_waiters_ (0), + dequeue_waiters_ (0) +#else : notempty_cond_ (this->lock_), notfull_cond_ (this->lock_) +#endif /* ACE_LACKS_COND_T */ { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::ACE_Message_Queue"); @@ -155,8 +162,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::~ACE_Message_Queue (void) template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::open (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns) + size_t lwm, + ACE_Notification_Strategy *ns) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::open"); this->high_water_mark_ = hwm; @@ -181,8 +188,10 @@ ACE_Message_Queue<ACE_SYNCH_2>::deactivate_i (void) this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; // Wakeup all waiters. +#if !defined (ACE_LACKS_COND_T) this->notempty_cond_.broadcast (); this->notfull_cond_.broadcast (); +#endif /* ACE_LACKS_COND_T */ this->deactivated_ = 1; return current_status; @@ -218,9 +227,9 @@ ACE_Message_Queue<ACE_SYNCH_2>::close (void) // Decrement all the counts. for (temp = this->head_; - temp != 0; - temp = temp->cont ()) - this->cur_bytes_ -= temp->size (); + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ -= temp->size (); temp = this->head_; this->head_ = this->head_->next (); @@ -269,12 +278,18 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item) this->cur_bytes_ += temp->size (); this->cur_count_++; - +#if !defined (ACE_LACKS_COND_T) // Tell any blocked threads that the queue has a new item! if (this->notempty_cond_.signal () != 0) return -1; - else - return this->cur_count_; +#else + if (this->dequeue_waiters_ > 0) + { + --this->dequeue_waiters_; + this->not_empty_cond_.release (); + } +#endif /* ACE_LACKS_COND_T */ + return this->cur_count_; } // Actually put the node at the head (no locking) @@ -305,12 +320,18 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head_i (ACE_Message_Block *new_item) this->cur_bytes_ += temp->size (); this->cur_count_++; - +#if !defined (ACE_LACKS_COND_T) // Tell any blocked threads that the queue has a new item! if (this->notempty_cond_.signal () != 0) return -1; - else - return this->cur_count_; +#else + if (this->dequeue_waiters_ > 0) + { + --this->dequeue_waiters_; + this->not_empty_cond_.release (); + } +#endif /* ACE_LACKS_COND_T */ + return this->cur_count_; } // Actually put the node at its proper position relative to its @@ -337,36 +358,36 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item) // priority. for (temp = this->tail_; - temp != 0; - temp = temp->prev ()) - { - if (temp->msg_priority () >= new_item->msg_priority ()) - // Break out when we've located an item that has higher - // priority that <new_item>. - break; - } + temp != 0; + temp = temp->prev ()) + { + if (temp->msg_priority () >= new_item->msg_priority ()) + // Break out when we've located an item that has higher + // priority that <new_item>. + break; + } if (temp == 0) - // Check for simple case of inserting at the head of the queue, - // where all we need to do is insert <new_item> before the - // current head. - return this->enqueue_head_i (new_item); + // Check for simple case of inserting at the head of the queue, + // where all we need to do is insert <new_item> before the + // current head. + return this->enqueue_head_i (new_item); else if (temp->next () == 0) - // Check for simple case of inserting at the end of the - // queue, where all we need to do is insert <new_item> after - // the current tail. - return this->enqueue_tail_i (new_item); + // Check for simple case of inserting at the end of the + // queue, where all we need to do is insert <new_item> after + // the current tail. + return this->enqueue_tail_i (new_item); else - { - // Insert the message right before the item of equal or - // higher priority. This ensures that FIFO order is - // maintained when messages of the same priority are - // inserted consecutively. - new_item->prev (temp); - new_item->next (temp->next ()); - temp->next ()->prev (new_item); - temp->next (new_item); - } + { + // Insert the message right before the item of equal or + // higher priority. This ensures that FIFO order is + // maintained when messages of the same priority are + // inserted consecutively. + new_item->prev (temp); + new_item->next (temp->next ()); + temp->next ()->prev (new_item); + temp->next (new_item); + } } // Make sure to count *all* the bytes in a composite message!!! @@ -377,17 +398,23 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item) this->cur_bytes_ += temp->size (); this->cur_count_++; - +#if !defined (ACE_LACKS_COND_T) // Tell any blocked threads that the queue has a new item! if (this->notempty_cond_.signal () != 0) return -1; - else - return this->cur_count_; +#else + if (this->dequeue_waiters_ > 0) + { + --this->dequeue_waiters_; + this->not_empty_cond_.release (); + } +#endif /* ACE_LACKS_COND_T */ + return this->cur_count_; } -// Actually get the first ACE_Message_Block (no locking, so must be called -// with locks held). This method assumes that the queue has at least -// one item in it when it is called. +// Actually get the first ACE_Message_Block (no locking, so must be +// called with locks held). This method assumes that the queue has at +// least one item in it when it is called. template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item) @@ -412,22 +439,24 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item) this->cur_count_--; -#if 0 - if (this->cur_bytes_ <= this->low_water_mark_) - // If queue is no longer full signal any waiting threads. -#endif /* 0 */ - +#if !defined (ACE_LACKS_COND_T) if (this->notfull_cond_.signal () != 0) return -1; - else - return this->cur_count_; +#else + if (this->enqueue_waiters_ > 0) + { + --this->enqueue_waiters_; + this->not_full_cond_.release (); + } +#endif /* ACE_LACKS_COND_T */ + return this->cur_count_; } // Take a look at the first item without removing it. template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -443,16 +472,16 @@ ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_ite while (this->is_empty_i ()) { if (this->notempty_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } + { + if (errno == ETIME) + errno = EWOULDBLOCK; + return -1; + } if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } + { + errno = ESHUTDOWN; + return -1; + } } first_item = this->head_; @@ -464,37 +493,46 @@ ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_ite template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - int queue_count; - if (this->deactivated_) { errno = ESHUTDOWN; return -1; } +#if defined (ACE_LACKS_COND_T) + if (this->is_full_i ()) + { + ++this->enqueue_waiters_; + // @@ Need to add sanity checks for failure... + ace_mon.release (); + this->not_full_cond_.acquire (); + ace_mon.acquire (); + } +#else // Wait while the queue is full while (this->is_full_i ()) { if (this->notfull_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } + { + if (errno == ETIME) + errno = EWOULDBLOCK; + return -1; + } if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } + { + errno = ESHUTDOWN; + return -1; + } } +#endif /* ACE_LACKS_COND_T */ - queue_count = this->enqueue_head_i (new_item); + int queue_count = this->enqueue_head_i (new_item); if (queue_count == -1) return -1; @@ -511,37 +549,46 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item, template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - int queue_count; - if (this->deactivated_) { errno = ESHUTDOWN; return -1; } +#if defined (ACE_LACKS_COND_T) + if (this->is_full_i ()) + { + ++this->enqueue_waiters_; + // @@ Need to add sanity checks for failure... + ace_mon.release (); + this->not_full_cond_.acquire (); + ace_mon.acquire (); + } +#else // Wait while the queue is full while (this->is_full_i ()) { if (this->notfull_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } + { + if (errno == ETIME) + errno = EWOULDBLOCK; + return -1; + } if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } + { + errno = ESHUTDOWN; + return -1; + } } +#endif /* ACE_LACKS_COND_T */ - queue_count = this->enqueue_i (new_item); + int queue_count = this->enqueue_i (new_item); if (queue_count == -1) return -1; @@ -554,7 +601,7 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio (ACE_Message_Block *new_item, template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::enqueue (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue"); return this->enqueue_prio (new_item, tv); @@ -565,36 +612,46 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue (ACE_Message_Block *new_item, template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - int queue_count; - if (this->deactivated_) { errno = ESHUTDOWN; return -1; } +#if defined (ACE_LACKS_COND_T) + if (this->is_full_i ()) + { + ++this->enqueue_waiters_; + // @@ Need to add sanity checks for failure... + ace_mon.release (); + this->not_full_cond_.acquire (); + ace_mon.acquire (); + } +#else // Wait while the queue is full while (this->is_full_i ()) { if (this->notfull_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } + { + if (errno == ETIME) + errno = EWOULDBLOCK; + return -1; + } if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } + { + errno = ESHUTDOWN; + return -1; + } } - queue_count = this->enqueue_tail_i (new_item); +#endif /* ACE_LACKS_COND_T */ + + int queue_count = this->enqueue_tail_i (new_item); if (queue_count == -1) return -1; @@ -611,7 +668,7 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item, template <ACE_SYNCH_1> int ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -622,22 +679,33 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item, return -1; } +#if defined (ACE_LACKS_COND_T) + if (this->is_empty_i ()) + { + ++this->dequeue_waiters_; + // @@ Need to add sanity checks for failure... + ace_mon.release (); + this->not_empty_cond_.acquire (); + ace_mon.acquire (); + } +#else // Wait while the queue is empty. while (this->is_empty_i ()) { if (this->notempty_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } + { + if (errno == ETIME) + errno = EWOULDBLOCK; + return -1; + } if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } + { + errno = ESHUTDOWN; + return -1; + } } +#endif /* ACE_LACKS_COND_T */ return this->dequeue_head_i (first_item); } |