summaryrefslogtreecommitdiff
path: root/TAO/tao/Incoming_Message_Queue.cpp
diff options
context:
space:
mode:
authorbala <balanatarajan@users.noreply.github.com>2003-12-14 16:03:49 +0000
committerbala <balanatarajan@users.noreply.github.com>2003-12-14 16:03:49 +0000
commita7e10f0b431dc0e2006196751782cd8fa40b16c7 (patch)
tree7422441092fdcc31025f06c8f064c7209747eb21 /TAO/tao/Incoming_Message_Queue.cpp
parent20ded8e045eb9993cfc20bf788d70329c11feb62 (diff)
downloadATCD-a7e10f0b431dc0e2006196751782cd8fa40b16c7.tar.gz
ChangeLogTag:Sun Dec 14 09:56:37 2003 Balachandran Natarajan <bala@dre.vanderbilt.edu>
Diffstat (limited to 'TAO/tao/Incoming_Message_Queue.cpp')
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp459
1 files changed, 56 insertions, 403 deletions
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
index a510be13f4b..14970106b50 100644
--- a/TAO/tao/Incoming_Message_Queue.cpp
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -1,8 +1,7 @@
#include "Incoming_Message_Queue.h"
-#include "Pluggable_Messaging.h"
#include "debug.h"
-#include "ace/Malloc_T.h"
-#include "ace/Message_Block.h"
+
+#include "ace/Log_Msg.h"
#if !defined (__ACE_INLINE__)
# include "Incoming_Message_Queue.inl"
@@ -13,7 +12,7 @@ ACE_RCSID (tao,
"$Id$")
TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
- : last_added_ (0),
+ : queued_data_ (0),
size_ (0),
orb_core_ (orb_core)
{
@@ -43,21 +42,21 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
{
// Check to see if the length of the incoming block is less than
// that of the <missing_data_> of the tail.
- if (block.length () <= this->last_added_->missing_data_bytes_)
+ if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
{
n = block.length ();
}
else
{
- n = this->last_added_->missing_data_bytes_;
+ n = this->queued_data_->missing_data_;
}
// Do the copy
- this->last_added_->msg_block_->copy (block.rd_ptr (),
+ this->queued_data_->msg_block_->copy (block.rd_ptr (),
n);
// Decerement the missing data
- this->last_added_->missing_data_bytes_ -= n;
+ this->queued_data_->missing_data_ -= n;
}
return n;
@@ -66,20 +65,17 @@ TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
TAO_Queued_Data *
TAO_Incoming_Message_Queue::dequeue_head (void)
{
- if (this->size_ == 0)
- return 0;
-
// Get the node on the head of the queue...
- TAO_Queued_Data *head = this->last_added_->next_;
+ TAO_Queued_Data *tmp =
+ this->queued_data_->next_;
// Reset the head node..
- this->last_added_->next_ = head->next_;
-
- // Decrease the size and reset last_added_ if empty
- if (--this->size_ == 0)
- this->last_added_ = 0;
+ this->queued_data_->next_ = tmp->next_;
+
+ // Decrease the size
+ --this->size_;
- return head;
+ return tmp;
}
TAO_Queued_Data *
@@ -90,412 +86,95 @@ TAO_Incoming_Message_Queue::dequeue_tail (void)
return 0;
// Get the node on the head of the queue...
- TAO_Queued_Data *head =
- this->last_added_->next_;
+ TAO_Queued_Data *tmp =
+ this->queued_data_->next_;
- while (head->next_ != this->last_added_)
+ while (tmp->next_ != this->queued_data_)
{
- head = head->next_;
+ tmp = tmp->next_;
}
// Put the head in tmp.
- head->next_ = this->last_added_->next_;
+ tmp->next_ = this->queued_data_->next_;
- TAO_Queued_Data *ret_qd = this->last_added_;
+ TAO_Queued_Data *ret_qd = this->queued_data_;
- this->last_added_ = head;
+ this->queued_data_ = tmp;
// Decrease the size
- if (--this->size_ == 0)
- this->last_added_ = 0;
+ --this->size_;
return ret_qd;
}
+
int
TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
{
if (this->size_ == 0)
{
- this->last_added_ = nd;
- this->last_added_->next_ = this->last_added_;
+ this->queued_data_ = nd;
+ this->queued_data_->next_ = this->queued_data_;
}
else
{
- nd->next_ = this->last_added_->next_;
- this->last_added_->next_ = nd;
- this->last_added_ = nd;
+ nd->next_ = this->queued_data_->next_;
+ this->queued_data_->next_ = nd;
+ this->queued_data_ = nd;
}
++ this->size_;
return 0;
}
-TAO_Queued_Data *
-TAO_Incoming_Message_Queue::find_fragment (CORBA::Octet major,
- CORBA::Octet minor) const
-{
- TAO_Queued_Data *found = 0;
- if (this->last_added_ != 0)
- {
- TAO_Queued_Data *qd = this->last_added_->next_;
-
- do {
- if (qd->more_fragments_ &&
- qd->major_version_ == major && qd->minor_version_ == minor)
- {
- found = qd;
- }
- else
- {
- qd = qd->next_;
- }
- } while (found == 0 && qd != this->last_added_->next_);
- }
-
- return found;
-}
-
-TAO_Queued_Data *
-TAO_Incoming_Message_Queue::find_fragment (CORBA::ULong request_id) const
-{
- TAO_Queued_Data *found = 0;
- if (this->last_added_ != 0)
- {
- TAO_Queued_Data *qd = this->last_added_->next_;
-
- do {
- if (qd->more_fragments_ && qd->request_id_ == request_id)
- {
- found = qd;
- }
- else
- {
- qd = qd->next_;
- }
- } while (found == 0 && qd != this->last_added_->next_);
- }
-
- return found;
-}
-
/************************************************************************/
// Methods for TAO_Queued_Data
/************************************************************************/
TAO_Queued_Data::TAO_Queued_Data (ACE_Allocator *alloc)
- : msg_block_ (0)
- , current_state_ (INVALID)
- , missing_data_bytes_ (0)
- , byte_order_ (0)
- , major_version_ (0)
- , minor_version_ (0)
- , more_fragments_ (0)
- , request_id_ (0)
- , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- , next_ (0)
- , allocator_ (alloc)
+ : msg_block_ (0),
+ missing_data_ (0),
+ byte_order_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb,
ACE_Allocator *alloc)
- : msg_block_ (mb)
- , current_state_ (INVALID)
- , missing_data_bytes_ (0)
- , byte_order_ (0)
- , major_version_ (0)
- , minor_version_ (0)
- , more_fragments_ (0)
- , request_id_ (0)
- , msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- , next_ (0)
- , allocator_ (alloc)
+ : msg_block_ (mb),
+ missing_data_ (0),
+ byte_order_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ more_fragments_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0),
+ allocator_ (alloc)
{
}
TAO_Queued_Data::TAO_Queued_Data (const TAO_Queued_Data &qd)
- : msg_block_ (qd.msg_block_->duplicate ())
- , current_state_ (qd.current_state_)
- , missing_data_bytes_ (qd.missing_data_bytes_)
- , byte_order_ (qd.byte_order_)
- , major_version_ (qd.major_version_)
- , minor_version_ (qd.minor_version_)
- , more_fragments_ (qd.more_fragments_)
- , request_id_ (qd.request_id_)
- , msg_type_ (qd.msg_type_)
- , next_ (0)
- , allocator_ (qd.allocator_)
+ : msg_block_ (qd.msg_block_->duplicate ()),
+ missing_data_ (qd.missing_data_),
+ byte_order_ (qd.byte_order_),
+ major_version_ (qd.major_version_),
+ minor_version_ (qd.minor_version_),
+ more_fragments_ (qd.more_fragments_),
+ msg_type_ (qd.msg_type_),
+ next_ (0),
+ allocator_ (qd.allocator_)
{
}
-
-/*!
- \brief Allocate and return a new empty message block of size \a new_size mimicking parameters of \a mb.
-
- This function allocates a new aligned message block using the same
- allocators and flags as found in \a mb. The size of the new message
- block is at least \a new_size; the size may be adjusted up in order
- to accomodate alignment requirements and still fit \a new_size bytes
- into the aligned buffer.
-
- \param mb message block whose parameters should be mimicked
- \param new_size size of the new message block (will be adjusted for proper alignment)
- \return an aligned message block with rd_ptr sitting at correct alignment spot, 0 on failure
-
- \author Thanks to Rich Seibel for helping implement with the public API for ACE_Message_Block!
- */
-static ACE_Message_Block*
-clone_mb_nocopy_size (ACE_Message_Block *mb, size_t span_size)
-{
- // Calculate the required size of the cloned block with alignment
- size_t aligned_size = ACE_CDR::first_size (span_size + ACE_CDR::MAX_ALIGNMENT);
-
- // Get the allocators
- ACE_Allocator *data_allocator;
- ACE_Allocator *data_block_allocator;
- ACE_Allocator *message_block_allocator;
- mb->access_allocators (data_allocator,
- data_block_allocator,
- message_block_allocator);
-
- // Create a new Message Block
- ACE_Message_Block *nb;
- ACE_NEW_MALLOC_RETURN (nb,
- ACE_static_cast(ACE_Message_Block*,
- message_block_allocator->malloc (
- sizeof (ACE_Message_Block))),
- ACE_Message_Block(aligned_size,
- mb->msg_type(),
- mb->cont(),
- 0, //we want the data block created
- data_allocator,
- mb->locking_strategy(),
- mb->msg_priority(),
- mb->msg_execution_time (),
- mb->msg_deadline_time (),
- data_block_allocator,
- message_block_allocator),
- 0);
-
- ACE_CDR::mb_align (nb);
-
- // Copy the flags over, but be SURE to clear the DONT_DELETE flag, since
- // we just dynamically allocated the two things.
- nb->set_flags (mb->flags());
- nb->clr_flags (ACE_Message_Block::DONT_DELETE);
-
- return nb;
-}
-
-/*!
- \brief Copy data from \a src->rd_ptr to \a dst->wr_ptr, of at most \a span_size bytes.
-
- (This is similar to memcpy, although with message blocks we can be a
- little smarter.) This function assumes that \a dst has enough space
- for \a span_size bytes, and that \a src has at least \a span_size
- bytes available to copy. When everything is copied \a dst->wr_ptr
- gets updated accordingly, but \a src->rd_ptr is left to the caller
- to update.
-
- \param dst the destination message block
- \param src the source message block
- \param span_size size of the maximum span of bytes to be copied
- \return 0 on failure, otherwise \a dst
- */
-static ACE_Message_Block*
-copy_mb_span (ACE_Message_Block *dst, ACE_Message_Block *src, size_t span_size)
-{
- // @todo check for enough space in dst, and src contains at least span_size
-
- if (src == 0 || dst == 0)
- return 0;
-
- if (span_size == 0)
- return dst;
-
- dst->copy (src->rd_ptr (), span_size);
- return dst;
-}
-
/*static*/
TAO_Queued_Data *
-TAO_Queued_Data::make_uncompleted_message (ACE_Message_Block *mb,
- TAO_Pluggable_Messaging &msging_obj,
- ACE_Allocator *alloc)
-{
- register TAO_Queued_Data *new_qd = 0;
- register const size_t HDR_LEN = msging_obj.header_length (); /* COMPUTE ONCE! */
- register const size_t MB_LEN = mb->length (); /* COMPUTE ONCE! */
-
- // Validate arguments.
- if (mb == 0)
- goto failure;
-
- new_qd = make_queued_data (alloc);
- if (new_qd == 0)
- goto failure;
-
- // do we have enough bytes to make a complete header?
- if (MB_LEN >= HDR_LEN)
- {
- // Since we have enough bytes to make a complete header,
- // the header needs to be valid. Check that now, and punt
- // if it's not valid.
- if (! msging_obj.check_for_valid_header (*mb))
- {
- goto failure;
- }
- else
- {
- new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
- msging_obj.set_queued_data_from_message_header (new_qd, *mb);
- if (new_qd->current_state_ == INVALID)
- goto failure;
-
- // missing_data_bytes_ now has the full GIOP message size, so we allocate
- // a new message block of that size, plus the header.
- new_qd->msg_block_ = clone_mb_nocopy_size (mb,
- new_qd->missing_data_bytes_ +
- HDR_LEN);
- // Of course, we don't have the whole message (if we did, we
- // wouldn't be here!), so we copy only what we've got, i.e., whatever's
- // in the message block.
- if (copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
- goto failure;
-
- // missing_data_bytes_ now has the full GIOP message size, but
- // there might still be stuff in mb. Therefore, we have to adjust
- // missing_data_bytes_, i.e., decrease it by the number of "actual
- // payload bytes" in mb.
- //
- // "actual payload bytes" :== length of mb (which included the header) - header length
- new_qd->missing_data_bytes_ -= (MB_LEN - HDR_LEN);
- mb->rd_ptr (MB_LEN);
- }
- }
- else
- {
- new_qd->current_state_ = WAITING_TO_COMPLETE_HEADER;
- new_qd->msg_block_ = clone_mb_nocopy_size (mb, HDR_LEN);
- if (new_qd->msg_block_ == 0 ||
- copy_mb_span (new_qd->msg_block_, mb, MB_LEN) == 0)
- goto failure;
- new_qd->missing_data_bytes_ = HDR_LEN - MB_LEN;
- mb->rd_ptr (MB_LEN);
- }
-
- ACE_ASSERT (new_qd->current_state_ != INVALID);
- if (TAO_debug_level > 7)
- {
- const char* s = "?unk?";
- switch (new_qd->current_state_)
- {
- case WAITING_TO_COMPLETE_HEADER: s = "WAITING_TO_COMPLETE_HEADER"; break;
- case WAITING_TO_COMPLETE_PAYLOAD: s = "WAITING_TO_COMPLETE_PAYLOAD"; break;
- case INVALID: s = "INVALID"; break;
- case COMPLETED: s = "COMPLETED"; break;
- }
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
- ACE_TEXT ("made uncompleted message from %u bytes into qd=%-08x:")
- ACE_TEXT ("state=%s,missing_data_bytes=%u\n"),
- new_qd->msg_block_->length(), new_qd, s, new_qd->missing_data_bytes_));
- }
- return new_qd;
-
-failure:
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_uncompleted_message: ")
- ACE_TEXT ("failed to make uncompleted message: mb=%-08x, qd=%-08x\n"),
- mb, new_qd));
- }
- TAO_Queued_Data::release (new_qd);
- return 0;
-}
-
-
-/*static*/
-TAO_Queued_Data *
-TAO_Queued_Data::make_completed_message (ACE_Message_Block &mb,
- TAO_Pluggable_Messaging &msging_obj,
- ACE_Allocator *alloc)
-{
- register const size_t HDR_LEN = msging_obj.header_length ();
- register const size_t MB_LEN = mb.length ();
-
- // Validate arguments.
- if (MB_LEN < HDR_LEN)
- return 0;
-
- size_t total_msg_len = 0;
- register TAO_Queued_Data *new_qd = make_queued_data (alloc);
- if (new_qd == 0)
- goto failure;
-
- // We can assume that there are enough bytes for a header, so
- // extract the header data. Don't assume that there's enough for
- // the payload just yet.
- new_qd->current_state_ = WAITING_TO_COMPLETE_PAYLOAD;
- msging_obj.set_queued_data_from_message_header (new_qd, mb);
- if (new_qd->current_state_ == INVALID)
- goto failure;
-
- // new_qd_->missing_data_bytes_ + protocol header length should be
- // *at least* the length of the message. Verify that we have that
- // many bytes in the message block and, if we don't, release the new
- // qd and fail.
- total_msg_len = new_qd->missing_data_bytes_ + HDR_LEN;
- if (total_msg_len > MB_LEN)
- goto failure;
-
- // Make a copy of the relevant portion of mb and hang on to it
- if ((new_qd->msg_block_ = clone_mb_nocopy_size (&mb, total_msg_len)) == 0)
- goto failure;
-
- if (copy_mb_span (new_qd->msg_block_, &mb, total_msg_len) == 0)
- goto failure;
-
- // Update missing data and the current state
- new_qd->missing_data_bytes_ = 0;
- new_qd->current_state_ = COMPLETED;
-
- // Advance the rd_ptr on the message block
- mb.rd_ptr (total_msg_len);
-
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
- ACE_TEXT ("extracted complete message (%u bytes incl hdr) from mblk=%-08x into qd=%-08x\n"),
- total_msg_len, &mb, new_qd));
- }
-
- return new_qd;
-
-failure:
- if (TAO_debug_level > 7)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Queued_Data::make_complete_message: ")
- ACE_TEXT ("failed to find complete message in mblk=%-08x; leaving %u bytes in block\n"),
- &mb, MB_LEN));
- if (TAO_debug_level >= 10)
- ACE_HEX_DUMP ((LM_DEBUG,
- mb.rd_ptr (), MB_LEN,
- ACE_TEXT (" residual bytes in buffer")));
-
- }
- TAO_Queued_Data::release (new_qd);
- return 0;
-}
-
-/*static*/
-TAO_Queued_Data *
-TAO_Queued_Data::make_queued_data (ACE_Allocator *alloc)
+TAO_Queued_Data::get_queued_data (ACE_Allocator *alloc)
{
TAO_Queued_Data *qd = 0;
@@ -602,29 +281,3 @@ TAO_Queued_Data::duplicate (TAO_Queued_Data &sqd)
return qd;
}
-
-void
-TAO_Queued_Data::consolidate (void)
-{
- // Is this a chain of fragments?
- if (this->more_fragments_ && this->msg_block_->cont () != 0)
- {
- // Create a message block big enough to hold the entire chain
- ACE_Message_Block *dest = clone_mb_nocopy_size (
- this->msg_block_,
- this->msg_block_->total_length ());
- // Reset the cont() parameter
- dest->cont (0);
-
- // Use ACE_CDR to consolidate the chain for us
- ACE_CDR::consolidate (dest, this->msg_block_);
-
- // free the original message block chain
- this->msg_block_->release ();
-
- // Set the message block to the new consolidated message block
- this->msg_block_ = dest;
- this->more_fragments_ = 0;
- }
-}
-