diff options
author | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-24 16:33:06 +0000 |
---|---|---|
committer | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-24 16:33:06 +0000 |
commit | e4e903f30b86212ab2eb2ff3a6b5fed0027a85c9 (patch) | |
tree | 2f8d3ef6af798cd9475ee7d0fc6da37251e537ec /ace/Message_Queue.cpp | |
parent | 14bda205017d49436198b98beb8b501f103ea8a8 (diff) | |
download | ATCD-e4e903f30b86212ab2eb2ff3a6b5fed0027a85c9.tar.gz |
added dynamic message queues
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r-- | ace/Message_Queue.cpp | 530 |
1 files changed, 510 insertions, 20 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp index 4d9c46d53b1..2fb8e432421 100644 --- a/ace/Message_Queue.cpp +++ b/ace/Message_Queue.cpp @@ -12,6 +12,12 @@ ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) +ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue) + +////////////////////////////////////// +// class ACE_Message_Queue_Iterator // +////////////////////////////////////// + template <ACE_SYNCH_DECL> ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) : queue_ (q), @@ -58,6 +64,10 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) +////////////////////////////////////////////// +// class ACE_Message_Queue_Reverse_Iterator // +////////////////////////////////////////////// + template <ACE_SYNCH_DECL> ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) : queue_ (q), @@ -102,6 +112,10 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const { } +///////////////////////////// +// class ACE_Message_Queue // +///////////////////////////// + template <ACE_SYNCH_DECL> void ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const { @@ -377,34 +391,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) // We start looking from the highest priority to the lowest // priority. - for (temp = this->tail_; + for (temp = this->head_; temp != 0; - temp = temp->prev ()) - if (temp->msg_priority () >= new_item->msg_priority ()) - // Break out when we've located an item that has higher - // priority that <new_item>. - break; + temp = temp->next ()) + if (temp->msg_priority () < new_item->msg_priority ()) + // Break out when we've located an item that has lower + // priority that <new_item>. + break; if (temp == 0) - // Check for simple case of inserting at the head of the queue, - // where all we need to do is insert <new_item> before the - // current head. - return this->enqueue_head_i (new_item); - else if (temp->next () == 0) - // Check for simple case of inserting at the end of the - // queue, where all we need to do is insert <new_item> after - // the current tail. + // Check for simple case of inserting at the tail of the queue, + // where all we need to do is insert <new_item> after the + // current tail. return this->enqueue_tail_i (new_item); + else if (temp->prev () == 0) + // Check for simple case of inserting at the head of the + // queue, where all we need to do is insert <new_item> before + // the current head. + return this->enqueue_head_i (new_item); else { - // Insert the message right before the item of equal or - // higher priority. This ensures that FIFO order is + // Insert the new message ahead of the item of + // lesser priority. This ensures that FIFO order is // maintained when messages of the same priority are // inserted consecutively. - new_item->prev (temp); - new_item->next (temp->next ()); - temp->next ()->prev (new_item); - temp->next (new_item); + new_item->next (temp); + new_item->prev (temp->prev ()); + temp->prev ()->next (new_item); + temp->prev (new_item); } } @@ -706,4 +720,480 @@ ACE_Message_Queue<ACE_SYNCH_USE>::notify (void) return this->notification_strategy_->notify (); } +//////////////////////////////////////// +// class ACE_Dynamic_Message_Strategy // +//////////////////////////////////////// + +ACE_Dynamic_Message_Strategy::ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) + : static_bit_field_mask_ (static_bit_field_mask) + , static_bit_field_shift_ (static_bit_field_shift) + , pending_threshold_ (pending_threshold) + , dynamic_priority_max_ (dynamic_priority_max) + , dynamic_priority_offset_ (dynamic_priority_offset) +{ +} +// ctor + +ACE_Dynamic_Message_Strategy::~ACE_Dynamic_Message_Strategy () +{ +} +// dtor + +/////////////////////////////////////// +// class ACE_Deadline_Message_Strategy // +/////////////////////////////////////// + +ACE_Deadline_Message_Strategy:: ACE_Deadline_Message_Strategy (u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) + : ACE_Dynamic_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset) +{ +} +// ctor + +ACE_Deadline_Message_Strategy::~ACE_Deadline_Message_Strategy () +{ +} +// dtor + +int +ACE_Deadline_Message_Strategy::update_priority (ACE_Message_Block & mb, + const ACE_Time_Value & tv) +{ + // The general formula for this deadline based dynamic priority + // function is to just subtract the current time and the execution + // time from the from the message deadline to get the time to deadline, + // then subtract the time to deadline from a constant C that depends on + // whether the time to deadline is negative (C is zero) or non-negative + // (C is the maximum allowed priority). But, to save operations for + // performance we use an optimized (albeit confusing: our apologies ;-) + // formula for the dynamic priority calculation. + + // first, compute the *negative* (additive inverse) of the time to deadline + ACE_Time_Value priority (tv); + priority -= mb.msg_deadline_time (); + + if (priority >= ACE_Time_Value::zero) + { + // if negative time to deadline is positive then the message is late: + // need to make sure the priority stays below the threshold + // between pending and late priority values + ACE_Time_Value + max_late (0, dynamic_priority_offset_ - 1); + + if (priority > max_late) + { + priority = max_late; + } + } + else + { + // if negative time to deadline is negative then the message is pending: + // so, we need to shift priority upward by adding the maximum priority + // value and then make sure the value stays above the threshold between + // pending and late message priorities. + priority += + ACE_Time_Value (0, dynamic_priority_max_); + + ACE_Time_Value + min_pending (0, dynamic_priority_offset_); + + if (priority < min_pending) + { + priority = min_pending; + } + } + + // use (fast) bitwise operators to isolate and replace + // the dynamic portion of the message's priority + mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) | + ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) << + static_bit_field_shift_)); + + return 0; +} + // priority evaluation function based on time to deadline + +int +ACE_Deadline_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb, + const ACE_Time_Value & tv) +{ + // first, compute the *negative* time to deadline + ACE_Time_Value priority (tv); + priority -= mb.msg_deadline_time (); + + // construct a time value with the maximum late value that + // can be represented in the dynamic priority range + ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1); + + // if negative time to deadline is greater than the maximum value + // that can be represented, it is identified as being beyond late + return (priority > max_late) ? 1 : 0; +} + // returns true if the message is later than can can be represented + +/////////////////////////////////////// +// class ACE_Laxity_Message_Strategy // +/////////////////////////////////////// + +ACE_Laxity_Message_Strategy::ACE_Laxity_Message_Strategy (u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) + : ACE_Dynamic_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset) +{ +} +// ctor + +ACE_Laxity_Message_Strategy::~ACE_Laxity_Message_Strategy () +{ +} +// dtor + + +int +ACE_Laxity_Message_Strategy::update_priority (ACE_Message_Block & mb, + const ACE_Time_Value & tv) +{ + // The general formula for this laxity based dynamic priority + // function is to just subtract the current time and the execution + // time from the from the message deadline to get the laxity, + // then subtract the laxity from a constant C that depends on whether + // the laxity is negative (C is zero) or non-negative (C is the maximum + // allowed priority). But, to save operations for performance we use + // an optimized (albeit confusing: our apologies ;-) formula + // for the dynamic priority calculation. + + // first, compute the *negative* laxity + ACE_Time_Value priority (tv); + priority += mb.msg_execution_time (); + priority -= mb.msg_deadline_time (); + + if (priority >= ACE_Time_Value::zero) + { + // if negative laxity is positive then the message is late: + // need to make sure the priority stays below the threshold + // between pending and late priority values + ACE_Time_Value + max_late (0, dynamic_priority_offset_ - 1); + + if (priority > max_late) + { + priority = max_late; + } + } + else + { + // if negative laxity is negative then the message is pending: so, we + // need to shift priority upward by adding the maximum priority value + // and then make sure the value stays above the threshold between + // pending and late message priorities. + priority += + ACE_Time_Value (0, dynamic_priority_max_); + + ACE_Time_Value + min_pending (0, dynamic_priority_offset_); + + if (priority < min_pending) + { + priority = min_pending; + } + } + + // use (fast) bitwise operators to isolate and replace + // the dynamic portion of the message's priority + mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) | + ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) << + static_bit_field_shift_)); + + return 0; +} + // priority evaluation function based on laxity + +int +ACE_Laxity_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb, + const ACE_Time_Value & tv) +{ + // first, compute the *negative* laxity + ACE_Time_Value priority (tv); + priority += mb.msg_execution_time (); + priority -= mb.msg_deadline_time (); + + // construct a time value with the maximum late value that + // can be represented in the dynamic priority range + ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1); + + // if negative laxity is greater than the maximum value that + // can be represented, it is identified as being beyond late + return (priority > max_late) ? 1 : 0; +} + // returns true if the message is later than can can be represented + + +///////////////////////////////////// +// class ACE_Dynamic_Message_Queue // +///////////////////////////////////// + + // = Initialization and termination methods. +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( + ACE_Dynamic_Message_Strategy & message_strategy, + size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) + : ACE_Message_Queue (hwm, lwm, ns) + , message_strategy_ (message_strategy) +{ + // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the + // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor +} + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) +{ + delete &message_strategy_; +} +// dtor: free message strategy and let base class dtor do the rest + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + int result = 0; + + // refresh dynamic priority of the new message + result = (*priority_eval_func_ptr_) (*new_item, tv); + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh dynamic priorities of messages in the queue + this->refresh_priorities (current_time); + + // reorganize the queue according to the new priorities + this->refresh_queue (current_time); + + // if there is only one message in the pending list, + // the pending list will be empty after a *successful* + // dequeue operation + int empty_pending = (head_ == pending_list_tail_) ? 1 : 0; + + // invoke the base class method + result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); + + // null out the pending list tail pointer if + // the pending list is now empty + if ((empty_pending) && (result > 0)) + { + pending_list_tail_ = 0; + } + + return result; +} + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all enqueued + // messages. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + + int result = 0; + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh dynamic priorities of messages in the queue + result = this->refresh_priorities (current_time); + if (result < 0) + { + return result; + } + + // reorganize the queue according to the new priorities + result = this->refresh_queue (current_time); + if (result < 0) + { + return result; + } + + // if there is only one message in the pending list, + // the pending list will be empty after a *successful* + // dequeue operation + int empty_pending = (head_ == pending_list_tail_) ? 1 : 0; + + // invoke the base class method + result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); + + // null out the pending list tail pointer if + // the pending list is now empty + if ((empty_pending) && (result > 0)) + { + pending_list_tail_ = 0; + } + + return result; +} + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv) +{ + int result = 0; + + // apply the priority update function to all enqueued + // messages, starting at the head of the queue + ACE_Message_Block *temp = head_; + while (temp) + { + result = (*priority_eval_func_ptr_) (*temp, tv); + if (result < 0) + { + break; + } + + temp = temp->next (); + } + + return result; +} + // refresh the priorities in the queue according + // to a specific priority assignment function + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv) +{ + // first, drop any messages from the queue and delete them: + // reference counting at the data block level means that the + // underlying data block will not be deleted if another + // message block is still pointing to it. + ACE_Message_Block *temp = (pending_list_tail_) + ? pending_list_tail_->next (); + : head_; + + while (temp) + { + // messages that have overflowed the given time bounds must be removed + if (message_strategy_.is_beyond_late (*temp, tv)) + { + // find the end of the chain of overflowed messages + ACE_Message_Block *remove_tail = temp; + while ((remove_tail) && (remove_tail->next ()) && + message_strategy_.is_beyond_late (*(remove_tail->next ()), tv)) + { + remove_tail = remove_tail->next (); + } + + temp = remove_tail->next (); + if (remove_temp->next ()) + { + remove_temp->next ()->prev (0); + } + else if (remove_temp->prev ()) + { + remove_temp->prev ()->next (0); + } + else + { + head_ = 0; + tail_ = 0; + } + remove_temp->prev (0); + remove_temp->next (0); + + temp = remove_temp->next (); + + } + else + { + temp = temp->next (); + } + } +} + // refresh the order of messages in the queue + // after refreshing their priorities + +///////////////////////////////////// +// class ACE_Message_Queue_Factory // +///////////////////////////////////// + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns); +} + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns, + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) +{ + ACE_Deadline_Message_Strategy *adms; + + ACE_NEW_RETURN (adms, + ACE_Deadline_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset), + 0); + + return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns); +} + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns, + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) +{ + ACE_Laxity_Message_Strategy *alms; + + ACE_NEW_RETURN (alms, + ACE_Laxity_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset), + 0); + + + return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns); +} + + #endif /* ACE_MESSAGE_QUEUE_C */ |