summaryrefslogtreecommitdiff
path: root/ace/Message_Queue.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-08-05 01:05:49 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-08-05 01:05:49 +0000
commit8bc2711f7ffea172ec3874ee3ec7d34c22c29c3e (patch)
tree93989fa7f1a0983b2322dc24a8ebc994de4f9f86 /ace/Message_Queue.cpp
parentea40de67ec23f87125db282675be26fad6489a20 (diff)
downloadATCD-8bc2711f7ffea172ec3874ee3ec7d34c22c29c3e.tar.gz
*** empty log message ***
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r--ace/Message_Queue.cpp324
1 files changed, 196 insertions, 128 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 436fa7f7c49..d5a90eb3046 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -107,33 +107,40 @@ ACE_Message_Queue<ACE_SYNCH_2>::dump (void) const
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
ACE_DEBUG ((LM_DEBUG,
- "deactivated = %d\n"
- "low_water_mark = %d\n"
- "high_water_mark = %d\n"
- "cur_bytes = %d\n"
- "cur_count = %d\n",
- "head_ = %u\n",
- "tail_ = %u\n",
- this->deactivated_,
- this->low_water_mark_,
- this->high_water_mark_,
- this->cur_bytes_,
- this->cur_count_,
- this->head_,
- this->tail_));
+ "deactivated = %d\n"
+ "low_water_mark = %d\n"
+ "high_water_mark = %d\n"
+ "cur_bytes = %d\n"
+ "cur_count = %d\n",
+ "head_ = %u\n",
+ "tail_ = %u\n",
+ this->deactivated_,
+ this->low_water_mark_,
+ this->high_water_mark_,
+ this->cur_bytes_,
+ this->cur_count_,
+ this->head_,
+ this->tail_));
ACE_DEBUG ((LM_DEBUG,"notfull_cond: \n"));
- notfull_cond_.dump();
+ notfull_cond_.dump ();
ACE_DEBUG ((LM_DEBUG,"notempty_cond: \n"));
- notempty_cond_.dump();
+ notempty_cond_.dump ();
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
template <ACE_SYNCH_1>
ACE_Message_Queue<ACE_SYNCH_2>::ACE_Message_Queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+#if defined (ACE_LACKS_COND_T)
+ : not_empty_cond_ (0),
+ not_full_cond_ (0),
+ enqueue_waiters_ (0),
+ dequeue_waiters_ (0)
+#else
: notempty_cond_ (this->lock_),
notfull_cond_ (this->lock_)
+#endif /* ACE_LACKS_COND_T */
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::ACE_Message_Queue");
@@ -155,8 +162,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::~ACE_Message_Queue (void)
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::open (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::open");
this->high_water_mark_ = hwm;
@@ -181,8 +188,10 @@ ACE_Message_Queue<ACE_SYNCH_2>::deactivate_i (void)
this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
// Wakeup all waiters.
+#if !defined (ACE_LACKS_COND_T)
this->notempty_cond_.broadcast ();
this->notfull_cond_.broadcast ();
+#endif /* ACE_LACKS_COND_T */
this->deactivated_ = 1;
return current_status;
@@ -218,9 +227,9 @@ ACE_Message_Queue<ACE_SYNCH_2>::close (void)
// Decrement all the counts.
for (temp = this->head_;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ -= temp->size ();
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
temp = this->head_;
this->head_ = this->head_->next ();
@@ -269,12 +278,18 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item)
this->cur_bytes_ += temp->size ();
this->cur_count_++;
-
+#if !defined (ACE_LACKS_COND_T)
// Tell any blocked threads that the queue has a new item!
if (this->notempty_cond_.signal () != 0)
return -1;
- else
- return this->cur_count_;
+#else
+ if (this->dequeue_waiters_ > 0)
+ {
+ --this->dequeue_waiters_;
+ this->not_empty_cond_.release ();
+ }
+#endif /* ACE_LACKS_COND_T */
+ return this->cur_count_;
}
// Actually put the node at the head (no locking)
@@ -305,12 +320,18 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head_i (ACE_Message_Block *new_item)
this->cur_bytes_ += temp->size ();
this->cur_count_++;
-
+#if !defined (ACE_LACKS_COND_T)
// Tell any blocked threads that the queue has a new item!
if (this->notempty_cond_.signal () != 0)
return -1;
- else
- return this->cur_count_;
+#else
+ if (this->dequeue_waiters_ > 0)
+ {
+ --this->dequeue_waiters_;
+ this->not_empty_cond_.release ();
+ }
+#endif /* ACE_LACKS_COND_T */
+ return this->cur_count_;
}
// Actually put the node at its proper position relative to its
@@ -337,36 +358,36 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item)
// priority.
for (temp = this->tail_;
- temp != 0;
- temp = temp->prev ())
- {
- if (temp->msg_priority () >= new_item->msg_priority ())
- // Break out when we've located an item that has higher
- // priority that <new_item>.
- break;
- }
+ temp != 0;
+ temp = temp->prev ())
+ {
+ if (temp->msg_priority () >= new_item->msg_priority ())
+ // Break out when we've located an item that has higher
+ // priority that <new_item>.
+ break;
+ }
if (temp == 0)
- // Check for simple case of inserting at the head of the queue,
- // where all we need to do is insert <new_item> before the
- // current head.
- return this->enqueue_head_i (new_item);
+ // Check for simple case of inserting at the head of the queue,
+ // where all we need to do is insert <new_item> before the
+ // current head.
+ return this->enqueue_head_i (new_item);
else if (temp->next () == 0)
- // Check for simple case of inserting at the end of the
- // queue, where all we need to do is insert <new_item> after
- // the current tail.
- return this->enqueue_tail_i (new_item);
+ // Check for simple case of inserting at the end of the
+ // queue, where all we need to do is insert <new_item> after
+ // the current tail.
+ return this->enqueue_tail_i (new_item);
else
- {
- // Insert the message right before the item of equal or
- // higher priority. This ensures that FIFO order is
- // maintained when messages of the same priority are
- // inserted consecutively.
- new_item->prev (temp);
- new_item->next (temp->next ());
- temp->next ()->prev (new_item);
- temp->next (new_item);
- }
+ {
+ // Insert the message right before the item of equal or
+ // higher priority. This ensures that FIFO order is
+ // maintained when messages of the same priority are
+ // inserted consecutively.
+ new_item->prev (temp);
+ new_item->next (temp->next ());
+ temp->next ()->prev (new_item);
+ temp->next (new_item);
+ }
}
// Make sure to count *all* the bytes in a composite message!!!
@@ -377,17 +398,23 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item)
this->cur_bytes_ += temp->size ();
this->cur_count_++;
-
+#if !defined (ACE_LACKS_COND_T)
// Tell any blocked threads that the queue has a new item!
if (this->notempty_cond_.signal () != 0)
return -1;
- else
- return this->cur_count_;
+#else
+ if (this->dequeue_waiters_ > 0)
+ {
+ --this->dequeue_waiters_;
+ this->not_empty_cond_.release ();
+ }
+#endif /* ACE_LACKS_COND_T */
+ return this->cur_count_;
}
-// Actually get the first ACE_Message_Block (no locking, so must be called
-// with locks held). This method assumes that the queue has at least
-// one item in it when it is called.
+// Actually get the first ACE_Message_Block (no locking, so must be
+// called with locks held). This method assumes that the queue has at
+// least one item in it when it is called.
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item)
@@ -412,22 +439,24 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item)
this->cur_count_--;
-#if 0
- if (this->cur_bytes_ <= this->low_water_mark_)
- // If queue is no longer full signal any waiting threads.
-#endif /* 0 */
-
+#if !defined (ACE_LACKS_COND_T)
if (this->notfull_cond_.signal () != 0)
return -1;
- else
- return this->cur_count_;
+#else
+ if (this->enqueue_waiters_ > 0)
+ {
+ --this->enqueue_waiters_;
+ this->not_full_cond_.release ();
+ }
+#endif /* ACE_LACKS_COND_T */
+ return this->cur_count_;
}
// Take a look at the first item without removing it.
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -443,16 +472,16 @@ ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_ite
while (this->is_empty_i ())
{
if (this->notempty_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
}
first_item = this->head_;
@@ -464,37 +493,46 @@ ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_ite
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
- int queue_count;
-
if (this->deactivated_)
{
errno = ESHUTDOWN;
return -1;
}
+#if defined (ACE_LACKS_COND_T)
+ if (this->is_full_i ())
+ {
+ ++this->enqueue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ ace_mon.release ();
+ this->not_full_cond_.acquire ();
+ ace_mon.acquire ();
+ }
+#else
// Wait while the queue is full
while (this->is_full_i ())
{
if (this->notfull_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
}
+#endif /* ACE_LACKS_COND_T */
- queue_count = this->enqueue_head_i (new_item);
+ int queue_count = this->enqueue_head_i (new_item);
if (queue_count == -1)
return -1;
@@ -511,37 +549,46 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
- int queue_count;
-
if (this->deactivated_)
{
errno = ESHUTDOWN;
return -1;
}
+#if defined (ACE_LACKS_COND_T)
+ if (this->is_full_i ())
+ {
+ ++this->enqueue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ ace_mon.release ();
+ this->not_full_cond_.acquire ();
+ ace_mon.acquire ();
+ }
+#else
// Wait while the queue is full
while (this->is_full_i ())
{
if (this->notfull_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
}
+#endif /* ACE_LACKS_COND_T */
- queue_count = this->enqueue_i (new_item);
+ int queue_count = this->enqueue_i (new_item);
if (queue_count == -1)
return -1;
@@ -554,7 +601,7 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio (ACE_Message_Block *new_item,
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::enqueue (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue");
return this->enqueue_prio (new_item, tv);
@@ -565,36 +612,46 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue (ACE_Message_Block *new_item,
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
- int queue_count;
-
if (this->deactivated_)
{
errno = ESHUTDOWN;
return -1;
}
+#if defined (ACE_LACKS_COND_T)
+ if (this->is_full_i ())
+ {
+ ++this->enqueue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ ace_mon.release ();
+ this->not_full_cond_.acquire ();
+ ace_mon.acquire ();
+ }
+#else
// Wait while the queue is full
while (this->is_full_i ())
{
if (this->notfull_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
}
- queue_count = this->enqueue_tail_i (new_item);
+#endif /* ACE_LACKS_COND_T */
+
+ int queue_count = this->enqueue_tail_i (new_item);
if (queue_count == -1)
return -1;
@@ -611,7 +668,7 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item,
template <ACE_SYNCH_1> int
ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *tv)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -622,22 +679,33 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item,
return -1;
}
+#if defined (ACE_LACKS_COND_T)
+ if (this->is_empty_i ())
+ {
+ ++this->dequeue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ ace_mon.release ();
+ this->not_empty_cond_.acquire ();
+ ace_mon.acquire ();
+ }
+#else
// Wait while the queue is empty.
while (this->is_empty_i ())
{
if (this->notempty_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
}
+#endif /* ACE_LACKS_COND_T */
return this->dequeue_head_i (first_item);
}