summaryrefslogtreecommitdiff
path: root/ace/Message_Queue.cpp
diff options
context:
space:
mode:
authorcdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-24 16:33:06 +0000
committercdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-24 16:33:06 +0000
commite4e903f30b86212ab2eb2ff3a6b5fed0027a85c9 (patch)
tree2f8d3ef6af798cd9475ee7d0fc6da37251e537ec /ace/Message_Queue.cpp
parent14bda205017d49436198b98beb8b501f103ea8a8 (diff)
downloadATCD-e4e903f30b86212ab2eb2ff3a6b5fed0027a85c9.tar.gz
added dynamic message queues
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r--ace/Message_Queue.cpp530
1 files changed, 510 insertions, 20 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 4d9c46d53b1..2fb8e432421 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -12,6 +12,12 @@
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
+ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
+
+//////////////////////////////////////
+// class ACE_Message_Queue_Iterator //
+//////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
: queue_ (q),
@@ -58,6 +64,10 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
+//////////////////////////////////////////////
+// class ACE_Message_Queue_Reverse_Iterator //
+//////////////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
: queue_ (q),
@@ -102,6 +112,10 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
{
}
+/////////////////////////////
+// class ACE_Message_Queue //
+/////////////////////////////
+
template <ACE_SYNCH_DECL> void
ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
{
@@ -377,34 +391,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
// We start looking from the highest priority to the lowest
// priority.
- for (temp = this->tail_;
+ for (temp = this->head_;
temp != 0;
- temp = temp->prev ())
- if (temp->msg_priority () >= new_item->msg_priority ())
- // Break out when we've located an item that has higher
- // priority that <new_item>.
- break;
+ temp = temp->next ())
+ if (temp->msg_priority () < new_item->msg_priority ())
+ // Break out when we've located an item that has lower
+ // priority that <new_item>.
+ break;
if (temp == 0)
- // Check for simple case of inserting at the head of the queue,
- // where all we need to do is insert <new_item> before the
- // current head.
- return this->enqueue_head_i (new_item);
- else if (temp->next () == 0)
- // Check for simple case of inserting at the end of the
- // queue, where all we need to do is insert <new_item> after
- // the current tail.
+ // Check for simple case of inserting at the tail of the queue,
+ // where all we need to do is insert <new_item> after the
+ // current tail.
return this->enqueue_tail_i (new_item);
+ else if (temp->prev () == 0)
+ // Check for simple case of inserting at the head of the
+ // queue, where all we need to do is insert <new_item> before
+ // the current head.
+ return this->enqueue_head_i (new_item);
else
{
- // Insert the message right before the item of equal or
- // higher priority. This ensures that FIFO order is
+ // Insert the new message ahead of the item of
+ // lesser priority. This ensures that FIFO order is
// maintained when messages of the same priority are
// inserted consecutively.
- new_item->prev (temp);
- new_item->next (temp->next ());
- temp->next ()->prev (new_item);
- temp->next (new_item);
+ new_item->next (temp);
+ new_item->prev (temp->prev ());
+ temp->prev ()->next (new_item);
+ temp->prev (new_item);
}
}
@@ -706,4 +720,480 @@ ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
return this->notification_strategy_->notify ();
}
+////////////////////////////////////////
+// class ACE_Dynamic_Message_Strategy //
+////////////////////////////////////////
+
+ACE_Dynamic_Message_Strategy::ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+ : static_bit_field_mask_ (static_bit_field_mask)
+ , static_bit_field_shift_ (static_bit_field_shift)
+ , pending_threshold_ (pending_threshold)
+ , dynamic_priority_max_ (dynamic_priority_max)
+ , dynamic_priority_offset_ (dynamic_priority_offset)
+{
+}
+// ctor
+
+ACE_Dynamic_Message_Strategy::~ACE_Dynamic_Message_Strategy ()
+{
+}
+// dtor
+
+///////////////////////////////////////
+// class ACE_Deadline_Message_Strategy //
+///////////////////////////////////////
+
+ACE_Deadline_Message_Strategy:: ACE_Deadline_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+ : ACE_Dynamic_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset)
+{
+}
+// ctor
+
+ACE_Deadline_Message_Strategy::~ACE_Deadline_Message_Strategy ()
+{
+}
+// dtor
+
+int
+ACE_Deadline_Message_Strategy::update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // The general formula for this deadline based dynamic priority
+ // function is to just subtract the current time and the execution
+ // time from the from the message deadline to get the time to deadline,
+ // then subtract the time to deadline from a constant C that depends on
+ // whether the time to deadline is negative (C is zero) or non-negative
+ // (C is the maximum allowed priority). But, to save operations for
+ // performance we use an optimized (albeit confusing: our apologies ;-)
+ // formula for the dynamic priority calculation.
+
+ // first, compute the *negative* (additive inverse) of the time to deadline
+ ACE_Time_Value priority (tv);
+ priority -= mb.msg_deadline_time ();
+
+ if (priority >= ACE_Time_Value::zero)
+ {
+ // if negative time to deadline is positive then the message is late:
+ // need to make sure the priority stays below the threshold
+ // between pending and late priority values
+ ACE_Time_Value
+ max_late (0, dynamic_priority_offset_ - 1);
+
+ if (priority > max_late)
+ {
+ priority = max_late;
+ }
+ }
+ else
+ {
+ // if negative time to deadline is negative then the message is pending:
+ // so, we need to shift priority upward by adding the maximum priority
+ // value and then make sure the value stays above the threshold between
+ // pending and late message priorities.
+ priority +=
+ ACE_Time_Value (0, dynamic_priority_max_);
+
+ ACE_Time_Value
+ min_pending (0, dynamic_priority_offset_);
+
+ if (priority < min_pending)
+ {
+ priority = min_pending;
+ }
+ }
+
+ // use (fast) bitwise operators to isolate and replace
+ // the dynamic portion of the message's priority
+ mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
+ ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
+ static_bit_field_shift_));
+
+ return 0;
+}
+ // priority evaluation function based on time to deadline
+
+int
+ACE_Deadline_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // first, compute the *negative* time to deadline
+ ACE_Time_Value priority (tv);
+ priority -= mb.msg_deadline_time ();
+
+ // construct a time value with the maximum late value that
+ // can be represented in the dynamic priority range
+ ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1);
+
+ // if negative time to deadline is greater than the maximum value
+ // that can be represented, it is identified as being beyond late
+ return (priority > max_late) ? 1 : 0;
+}
+ // returns true if the message is later than can can be represented
+
+///////////////////////////////////////
+// class ACE_Laxity_Message_Strategy //
+///////////////////////////////////////
+
+ACE_Laxity_Message_Strategy::ACE_Laxity_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+ : ACE_Dynamic_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset)
+{
+}
+// ctor
+
+ACE_Laxity_Message_Strategy::~ACE_Laxity_Message_Strategy ()
+{
+}
+// dtor
+
+
+int
+ACE_Laxity_Message_Strategy::update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // The general formula for this laxity based dynamic priority
+ // function is to just subtract the current time and the execution
+ // time from the from the message deadline to get the laxity,
+ // then subtract the laxity from a constant C that depends on whether
+ // the laxity is negative (C is zero) or non-negative (C is the maximum
+ // allowed priority). But, to save operations for performance we use
+ // an optimized (albeit confusing: our apologies ;-) formula
+ // for the dynamic priority calculation.
+
+ // first, compute the *negative* laxity
+ ACE_Time_Value priority (tv);
+ priority += mb.msg_execution_time ();
+ priority -= mb.msg_deadline_time ();
+
+ if (priority >= ACE_Time_Value::zero)
+ {
+ // if negative laxity is positive then the message is late:
+ // need to make sure the priority stays below the threshold
+ // between pending and late priority values
+ ACE_Time_Value
+ max_late (0, dynamic_priority_offset_ - 1);
+
+ if (priority > max_late)
+ {
+ priority = max_late;
+ }
+ }
+ else
+ {
+ // if negative laxity is negative then the message is pending: so, we
+ // need to shift priority upward by adding the maximum priority value
+ // and then make sure the value stays above the threshold between
+ // pending and late message priorities.
+ priority +=
+ ACE_Time_Value (0, dynamic_priority_max_);
+
+ ACE_Time_Value
+ min_pending (0, dynamic_priority_offset_);
+
+ if (priority < min_pending)
+ {
+ priority = min_pending;
+ }
+ }
+
+ // use (fast) bitwise operators to isolate and replace
+ // the dynamic portion of the message's priority
+ mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
+ ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
+ static_bit_field_shift_));
+
+ return 0;
+}
+ // priority evaluation function based on laxity
+
+int
+ACE_Laxity_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // first, compute the *negative* laxity
+ ACE_Time_Value priority (tv);
+ priority += mb.msg_execution_time ();
+ priority -= mb.msg_deadline_time ();
+
+ // construct a time value with the maximum late value that
+ // can be represented in the dynamic priority range
+ ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1);
+
+ // if negative laxity is greater than the maximum value that
+ // can be represented, it is identified as being beyond late
+ return (priority > max_late) ? 1 : 0;
+}
+ // returns true if the message is later than can can be represented
+
+
+/////////////////////////////////////
+// class ACE_Dynamic_Message_Queue //
+/////////////////////////////////////
+
+ // = Initialization and termination methods.
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
+ ACE_Dynamic_Message_Strategy & message_strategy,
+ size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+ : ACE_Message_Queue (hwm, lwm, ns)
+ , message_strategy_ (message_strategy)
+{
+ // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
+ // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
+{
+ delete &message_strategy_;
+}
+// dtor: free message strategy and let base class dtor do the rest
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+
+ int result = 0;
+
+ // refresh dynamic priority of the new message
+ result = (*priority_eval_func_ptr_) (*new_item, tv);
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh dynamic priorities of messages in the queue
+ this->refresh_priorities (current_time);
+
+ // reorganize the queue according to the new priorities
+ this->refresh_queue (current_time);
+
+ // if there is only one message in the pending list,
+ // the pending list will be empty after a *successful*
+ // dequeue operation
+ int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
+
+ // invoke the base class method
+ result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
+
+ // null out the pending list tail pointer if
+ // the pending list is now empty
+ if ((empty_pending) && (result > 0))
+ {
+ pending_list_tail_ = 0;
+ }
+
+ return result;
+}
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
+
+ int result = 0;
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh dynamic priorities of messages in the queue
+ result = this->refresh_priorities (current_time);
+ if (result < 0)
+ {
+ return result;
+ }
+
+ // reorganize the queue according to the new priorities
+ result = this->refresh_queue (current_time);
+ if (result < 0)
+ {
+ return result;
+ }
+
+ // if there is only one message in the pending list,
+ // the pending list will be empty after a *successful*
+ // dequeue operation
+ int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
+
+ // invoke the base class method
+ result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
+
+ // null out the pending list tail pointer if
+ // the pending list is now empty
+ if ((empty_pending) && (result > 0))
+ {
+ pending_list_tail_ = 0;
+ }
+
+ return result;
+}
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue.
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
+{
+ int result = 0;
+
+ // apply the priority update function to all enqueued
+ // messages, starting at the head of the queue
+ ACE_Message_Block *temp = head_;
+ while (temp)
+ {
+ result = (*priority_eval_func_ptr_) (*temp, tv);
+ if (result < 0)
+ {
+ break;
+ }
+
+ temp = temp->next ();
+ }
+
+ return result;
+}
+ // refresh the priorities in the queue according
+ // to a specific priority assignment function
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
+{
+ // first, drop any messages from the queue and delete them:
+ // reference counting at the data block level means that the
+ // underlying data block will not be deleted if another
+ // message block is still pointing to it.
+ ACE_Message_Block *temp = (pending_list_tail_)
+ ? pending_list_tail_->next ();
+ : head_;
+
+ while (temp)
+ {
+ // messages that have overflowed the given time bounds must be removed
+ if (message_strategy_.is_beyond_late (*temp, tv))
+ {
+ // find the end of the chain of overflowed messages
+ ACE_Message_Block *remove_tail = temp;
+ while ((remove_tail) && (remove_tail->next ()) &&
+ message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
+ {
+ remove_tail = remove_tail->next ();
+ }
+
+ temp = remove_tail->next ();
+ if (remove_temp->next ())
+ {
+ remove_temp->next ()->prev (0);
+ }
+ else if (remove_temp->prev ())
+ {
+ remove_temp->prev ()->next (0);
+ }
+ else
+ {
+ head_ = 0;
+ tail_ = 0;
+ }
+ remove_temp->prev (0);
+ remove_temp->next (0);
+
+ temp = remove_temp->next ();
+
+ }
+ else
+ {
+ temp = temp->next ();
+ }
+ }
+}
+ // refresh the order of messages in the queue
+ // after refreshing their priorities
+
+/////////////////////////////////////
+// class ACE_Message_Queue_Factory //
+/////////////////////////////////////
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+{
+ return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns);
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns,
+ u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+{
+ ACE_Deadline_Message_Strategy *adms;
+
+ ACE_NEW_RETURN (adms,
+ ACE_Deadline_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset),
+ 0);
+
+ return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns);
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns,
+ u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+{
+ ACE_Laxity_Message_Strategy *alms;
+
+ ACE_NEW_RETURN (alms,
+ ACE_Laxity_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset),
+ 0);
+
+
+ return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns);
+}
+
+
#endif /* ACE_MESSAGE_QUEUE_C */