summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
authorbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-07-06 04:41:00 +0000
committerbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2001-07-06 04:41:00 +0000
commitd8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8 (patch)
tree3874c3e46e81a1a1c5a6c459720e1c17cab62da2 /TAO/tao
parent08c2939a52133c144b5f17b7f8556b5dc046c0b0 (diff)
downloadATCD-d8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8.tar.gz
ChangeLogTag: Thu Jul 5 23:30:07 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/Acceptor_Filter.cpp3
-rw-r--r--TAO/tao/Adapter.cpp1
-rw-r--r--TAO/tao/Any.cpp2
-rw-r--r--TAO/tao/Asynch_Reply_Dispatcher.cpp26
-rw-r--r--TAO/tao/Asynch_Reply_Dispatcher.h8
-rw-r--r--TAO/tao/CDR.cpp2
-rw-r--r--TAO/tao/CORBALOC_Parser.cpp1
-rw-r--r--TAO/tao/ClientRequestInfo.cpp1
-rw-r--r--TAO/tao/Connection_Handler.cpp8
-rw-r--r--TAO/tao/Connection_Handler.h10
-rw-r--r--TAO/tao/DomainC.cpp1
-rw-r--r--TAO/tao/DomainC.i1
-rw-r--r--TAO/tao/DynamicC.cpp1
-rw-r--r--TAO/tao/DynamicC.i153
-rw-r--r--TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp21
-rw-r--r--TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h13
-rw-r--r--TAO/tao/Exception.cpp2
-rw-r--r--TAO/tao/Exclusive_TMS.cpp30
-rw-r--r--TAO/tao/Exclusive_TMS.h4
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp496
-rw-r--r--TAO/tao/GIOP_Message_Base.h89
-rw-r--r--TAO/tao/GIOP_Message_Base.i38
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.cpp574
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.h31
-rw-r--r--TAO/tao/GIOP_Message_Reactive_Handler.inl4
-rw-r--r--TAO/tao/GIOP_Message_State.cpp297
-rw-r--r--TAO/tao/GIOP_Message_State.h116
-rw-r--r--TAO/tao/GIOP_Message_State.i10
-rw-r--r--TAO/tao/GIOP_Message_State.inl58
-rw-r--r--TAO/tao/IIOP_Acceptor.cpp1
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp83
-rw-r--r--TAO/tao/IIOP_Connection_Handler.h6
-rw-r--r--TAO/tao/IIOP_Connector.cpp1
-rw-r--r--TAO/tao/IIOP_Transport.cpp186
-rw-r--r--TAO/tao/IIOP_Transport.h6
-rw-r--r--TAO/tao/IORInfo.cpp22
-rw-r--r--TAO/tao/Incoming_Message_Queue.cpp151
-rw-r--r--TAO/tao/Incoming_Message_Queue.h161
-rw-r--r--TAO/tao/Incoming_Message_Queue.inl78
-rw-r--r--TAO/tao/Invocation.cpp3
-rw-r--r--TAO/tao/Invocation_Endpoint_Selectors.cpp1
-rw-r--r--TAO/tao/LocalObject.cpp2
-rw-r--r--TAO/tao/Makefile6
-rw-r--r--TAO/tao/MessagingC.h1
-rw-r--r--TAO/tao/Muxed_TMS.cpp24
-rw-r--r--TAO/tao/Muxed_TMS.h4
-rw-r--r--TAO/tao/ORB.cpp10
-rw-r--r--TAO/tao/Object_Ref_Table.cpp2
-rw-r--r--TAO/tao/Pluggable_Messaging.cpp6
-rw-r--r--TAO/tao/Pluggable_Messaging.h39
-rw-r--r--TAO/tao/Pluggable_Messaging_Utils.cpp6
-rw-r--r--TAO/tao/Pluggable_Messaging_Utils.h6
-rw-r--r--TAO/tao/PolicyC.cpp1
-rw-r--r--TAO/tao/PolicyC.i1
-rw-r--r--TAO/tao/PolicyFactory_Registry.cpp1
-rw-r--r--TAO/tao/PortableInterceptor.pidl1
-rw-r--r--TAO/tao/Profile.cpp20
-rw-r--r--TAO/tao/Resume_Handle.cpp24
-rw-r--r--TAO/tao/Resume_Handle.h90
-rw-r--r--TAO/tao/Resume_Handle.inl38
-rw-r--r--TAO/tao/Strategies/DIOP_Connection_Handler.cpp32
-rw-r--r--TAO/tao/Strategies/DIOP_Connection_Handler.h3
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp254
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.h12
-rw-r--r--TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp28
-rw-r--r--TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h23
-rw-r--r--TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h3
-rw-r--r--TAO/tao/Strategies/Makefile3
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp49
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.h5
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.cpp239
-rw-r--r--TAO/tao/Strategies/SHMIOP_Transport.h13
-rw-r--r--TAO/tao/Strategies/TAO_Strategies.dsp92
-rw-r--r--TAO/tao/Strategies/UIOP_Connection_Handler.cpp42
-rw-r--r--TAO/tao/Strategies/UIOP_Connection_Handler.h9
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.cpp179
-rw-r--r--TAO/tao/Strategies/UIOP_Transport.h11
-rw-r--r--TAO/tao/Synch_Reply_Dispatcher.cpp36
-rw-r--r--TAO/tao/Synch_Reply_Dispatcher.h21
-rw-r--r--TAO/tao/TAO.dsp50
-rw-r--r--TAO/tao/TAO_Server_Request.cpp1
-rw-r--r--TAO/tao/Transport.cpp674
-rw-r--r--TAO/tao/Transport.h176
-rw-r--r--TAO/tao/Transport_Mux_Strategy.h10
-rw-r--r--TAO/tao/Wait_On_Read.cpp6
85 files changed, 2935 insertions, 2018 deletions
diff --git a/TAO/tao/Acceptor_Filter.cpp b/TAO/tao/Acceptor_Filter.cpp
index 8ba9afd45ee..98a3efb01d2 100644
--- a/TAO/tao/Acceptor_Filter.cpp
+++ b/TAO/tao/Acceptor_Filter.cpp
@@ -1,13 +1,16 @@
// $Id$
+
#include "tao/Acceptor_Filter.h"
#if !defined (__ACE_INLINE__)
# include "Acceptor_Filter.i"
#endif /* __ACE_INLINE__ */
+
ACE_RCSID(tao, Acceptor_Filter, "$Id$")
+
TAO_Acceptor_Filter::~TAO_Acceptor_Filter (void)
{
}
diff --git a/TAO/tao/Adapter.cpp b/TAO/tao/Adapter.cpp
index 41c9e3a8bd9..1f363431999 100644
--- a/TAO/tao/Adapter.cpp
+++ b/TAO/tao/Adapter.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "tao/Adapter.h"
#include "tao/Object.h"
#include "tao/Object_KeyC.h"
diff --git a/TAO/tao/Any.cpp b/TAO/tao/Any.cpp
index 4a5c307d38e..35aadd83e34 100644
--- a/TAO/tao/Any.cpp
+++ b/TAO/tao/Any.cpp
@@ -1,5 +1,6 @@
// $Id$
+
// Portions of this file are:
// Copyright 1994-1995 by Sun Microsystems Inc.
// All Rights Reserved
@@ -18,6 +19,7 @@
ACE_RCSID(tao, Any, "$Id$")
+
CORBA::TypeCode_ptr
CORBA_Any::type (void) const
{
diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp
index b4a20a82253..9f2bbc0a5eb 100644
--- a/TAO/tao/Asynch_Reply_Dispatcher.cpp
+++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp
@@ -1,6 +1,5 @@
// $Id$
-
#include "tao/Asynch_Reply_Dispatcher.h"
#include "tao/Pluggable_Messaging_Utils.h"
@@ -19,8 +18,16 @@ ACE_RCSID(tao, Asynch_Reply_Dispatcher, "$Id$")
// Constructor.
TAO_Asynch_Reply_Dispatcher_Base::TAO_Asynch_Reply_Dispatcher_Base (TAO_ORB_Core *orb_core)
- : reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE),
- 0,
+ : buf_ (),
+ db_ (sizeof buf_,
+ ACE_Message_Block::MB_DATA,
+ this->buf_,
+ orb_core->message_block_buffer_allocator (),
+ orb_core->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ orb_core->message_block_dblock_allocator ()),
+ reply_cdr_ (&db_,
+ ACE_Message_Block::MB_DATA,
TAO_ENCAP_BYTE_ORDER,
TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR,
@@ -48,11 +55,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply (
return 0;
}
-/*TAO_GIOP_Message_State *
-TAO_Asynch_Reply_Dispatcher_Base::message_state (void)
-{
- return this->message_state_;
-} */
void
TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *)
@@ -113,11 +115,11 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply (
this->reply_status_ = params.reply_status_;
- // this->message_state_ = message_state;
-
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.exchange_data_blocks (params.input_cdr_);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
+ ACE_UNUSED_ARG (db);
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
CORBA::ULong max = params.svc_ctx_.maximum ();
diff --git a/TAO/tao/Asynch_Reply_Dispatcher.h b/TAO/tao/Asynch_Reply_Dispatcher.h
index b8b1c978451..f81c68a142b 100644
--- a/TAO/tao/Asynch_Reply_Dispatcher.h
+++ b/TAO/tao/Asynch_Reply_Dispatcher.h
@@ -71,8 +71,12 @@ protected:
*/
IOP::ServiceContextList reply_service_info_;
- // TAO_GIOP_Message_State *message_state_;
- // CDR stream for reading the input.
+ /// The buffer that is used to initialise the data block
+ char buf_[ACE_CDR::DEFAULT_BUFSIZE];
+
+ /// datablock that is created on teh stack to initialise the CDR
+ /// stream underneath.
+ ACE_Data_Block db_;
/// CDR stream which has the reply information that needs to be
/// demarshalled by the stubs
diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp
index 07eb03a7cf2..a08222e531f 100644
--- a/TAO/tao/CDR.cpp
+++ b/TAO/tao/CDR.cpp
@@ -41,9 +41,11 @@
# include "tao/CDR.i"
#endif /* ! __ACE_INLINE__ */
+
ACE_RCSID(tao, CDR, "$Id$")
+
#if defined (ACE_ENABLE_TIMEPROBES)
static const char *TAO_CDR_Timeprobe_Description[] =
diff --git a/TAO/tao/CORBALOC_Parser.cpp b/TAO/tao/CORBALOC_Parser.cpp
index 13f09350804..ab4b228771a 100644
--- a/TAO/tao/CORBALOC_Parser.cpp
+++ b/TAO/tao/CORBALOC_Parser.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "CORBALOC_Parser.h"
#include "ORB_Core.h"
#include "Stub.h"
diff --git a/TAO/tao/ClientRequestInfo.cpp b/TAO/tao/ClientRequestInfo.cpp
index 8dab949ae98..281ca1ed6a7 100644
--- a/TAO/tao/ClientRequestInfo.cpp
+++ b/TAO/tao/ClientRequestInfo.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "ClientRequestInfo.h"
#include "Invocation.h"
#include "Stub.h"
diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp
index 985dbbf44c8..049b1d15a49 100644
--- a/TAO/tao/Connection_Handler.cpp
+++ b/TAO/tao/Connection_Handler.cpp
@@ -6,6 +6,7 @@
#include "tao/debug.h"
#include "tao/Object.h"
#include "tao/Messaging_Policy_i.h"
+#include "Resume_Handle.h"
#if !defined (__ACE_INLINE__)
#include "tao/Connection_Handler.inl"
@@ -90,10 +91,15 @@ TAO_Connection_Handler::svc_i (void)
max_wait_time = &current_timeout;
}
+ TAO_Resume_Handle rh (this->orb_core_,
+ ACE_INVALID_HANDLE);
+
while (!this->orb_core_->has_shutdown ()
&& result >= 0)
{
- result = this->handle_input_i (ACE_INVALID_HANDLE, max_wait_time);
+ result =
+ this->transport ()->handle_input_i (rh,
+ max_wait_time);
if (result == -1 && errno == ETIME)
{
diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h
index 7caeccecd4b..b0ac01d0077 100644
--- a/TAO/tao/Connection_Handler.h
+++ b/TAO/tao/Connection_Handler.h
@@ -101,9 +101,13 @@ protected:
/// Object.
int svc_i (void);
- /// Need to be implemented by the underlying protocol objects
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0) = 0;
+#if !defined (TAO_CONNECTION_HANDLER_STACK_BUF_SIZE)
+# define TAO_CONNECTION_HANDLER_STACK_BUF_SIZE 1024
+#endif /*TAO_CONNECTION_HANDLER_STACK_BUF_SIZE */
+
+#if !defined (TAO_RESUMES_CONNECTION_HANDLER)
+# define TAO_RESUMES_CONNECTION_HANDLER 1
+#endif /*TAO_RESUMES_CONNECTION_HANDLER*/
private:
diff --git a/TAO/tao/DomainC.cpp b/TAO/tao/DomainC.cpp
index 7dfa513c009..f2e9509a4f6 100644
--- a/TAO/tao/DomainC.cpp
+++ b/TAO/tao/DomainC.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/DomainC.i b/TAO/tao/DomainC.i
index 08c89453f73..b86dfb17929 100644
--- a/TAO/tao/DomainC.i
+++ b/TAO/tao/DomainC.i
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/DynamicC.cpp b/TAO/tao/DynamicC.cpp
index 310487d401c..3ba3d039091 100644
--- a/TAO/tao/DynamicC.cpp
+++ b/TAO/tao/DynamicC.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/DynamicC.i b/TAO/tao/DynamicC.i
index 09fd7a3ff51..b45699ab5bb 100644
--- a/TAO/tao/DynamicC.i
+++ b/TAO/tao/DynamicC.i
@@ -70,7 +70,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p)
{
Parameter *deep_copy =
new Parameter (*p.ptr_);
-
+
if (deep_copy != 0)
{
Parameter *tmp = deep_copy;
@@ -80,7 +80,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p)
}
}
}
-
+
return *this;
}
@@ -103,20 +103,20 @@ Dynamic::Parameter_var::operator const ::Dynamic::Parameter &() const // cast
}
ACE_INLINE
-Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast
+Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast
{
return *this->ptr_;
}
ACE_INLINE
-Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast
+Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast
{
return *this->ptr_;
}
// variable-size types only
ACE_INLINE
-Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast
+Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast
{
return this->ptr_;
}
@@ -133,7 +133,7 @@ Dynamic::Parameter_var::inout (void)
return *this->ptr_;
}
-// mapping for variable size
+// mapping for variable size
ACE_INLINE ::Dynamic::Parameter *&
Dynamic::Parameter_var::out (void)
{
@@ -194,7 +194,7 @@ Dynamic::Parameter_out::operator= (Parameter *p)
return *this;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::Parameter_out::operator ::Dynamic::Parameter *&() // cast
{
return this->ptr_;
@@ -214,7 +214,7 @@ Dynamic::Parameter_out::operator-> (void)
#if !defined (TAO_USE_SEQUENCE_TEMPLATES)
-
+
#if !defined (__TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_)
#define __TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_
@@ -227,24 +227,24 @@ Dynamic::Parameter_out::operator-> (void)
ACE_NEW_RETURN (retval, Dynamic::Parameter[size], 0);
return retval;
}
-
+
ACE_INLINE void Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::freebuf (Dynamic::Parameter *buffer)
// Free the sequence.
{
delete [] buffer;
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (void) // Default constructor.
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum) // Constructor using a maximum length value.
: TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (maximum))
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum,
CORBA::ULong length,
@@ -253,7 +253,7 @@ Dynamic::Parameter_out::operator-> (void)
: TAO_Unbounded_Base_Sequence (maximum, length, data, release)
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs)
// Copy constructor.
@@ -263,10 +263,10 @@ Dynamic::Parameter_out::operator-> (void)
{
Dynamic::Parameter *tmp1 = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (this->maximum_);
Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < this->length_; ++i)
tmp1[i] = tmp2[i];
-
+
this->buffer_ = tmp1;
}
else
@@ -274,14 +274,14 @@ Dynamic::Parameter_out::operator-> (void)
this->buffer_ = 0;
}
}
-
+
ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList &
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator= (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs)
// Assignment operator.
{
if (this == &rhs)
return *this;
-
+
if (this->release_)
{
if (this->maximum_ < rhs.maximum_)
@@ -294,18 +294,18 @@ Dynamic::Parameter_out::operator-> (void)
}
else
this->buffer_ = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (rhs.maximum_);
-
+
TAO_Unbounded_Base_Sequence::operator= (rhs);
-
+
Dynamic::Parameter *tmp1 = ACE_reinterpret_cast (Dynamic::Parameter *, this->buffer_);
Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < this->length_; ++i)
tmp1[i] = tmp2[i];
-
+
return *this;
}
-
+
// = Accessors.
ACE_INLINE Dynamic::Parameter &
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i)
@@ -315,7 +315,7 @@ Dynamic::Parameter_out::operator-> (void)
Dynamic::Parameter* tmp = ACE_reinterpret_cast(Dynamic::Parameter*,this->buffer_);
return tmp[i];
}
-
+
ACE_INLINE const Dynamic::Parameter &
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i) const
// operator []
@@ -324,9 +324,9 @@ Dynamic::Parameter_out::operator-> (void)
Dynamic::Parameter * const tmp = ACE_reinterpret_cast (Dynamic::Parameter* ACE_CAST_CONST, this->buffer_);
return tmp[i];
}
-
+
// Implement the TAO_Base_Sequence methods (see Sequence.h)
-
+
ACE_INLINE Dynamic::Parameter *
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (CORBA::Boolean orphan)
{
@@ -360,13 +360,13 @@ Dynamic::Parameter_out::operator-> (void)
}
return result;
}
-
+
ACE_INLINE const Dynamic::Parameter *
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (void) const
{
return ACE_reinterpret_cast(const Dynamic::Parameter * ACE_CAST_CONST, this->buffer_);
}
-
+
ACE_INLINE void
Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::replace (CORBA::ULong max,
CORBA::ULong length,
@@ -383,11 +383,11 @@ Dynamic::Parameter_out::operator-> (void)
this->buffer_ = data;
this->release_ = release;
}
-
+
#endif /* end #if !defined */
-#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
+#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
#if !defined (_DYNAMIC_PARAMETERLIST_CI_)
#define _DYNAMIC_PARAMETERLIST_CI_
@@ -443,7 +443,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p)
{
ParameterList *deep_copy =
new ParameterList (*p.ptr_);
-
+
if (deep_copy != 0)
{
ParameterList *tmp = deep_copy;
@@ -453,7 +453,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p)
}
}
}
-
+
return *this;
}
@@ -469,27 +469,27 @@ Dynamic::ParameterList_var::operator-> (void)
return this->ptr_;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ParameterList_var::operator const ::Dynamic::ParameterList &() const // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast
+ACE_INLINE
+Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast
+ACE_INLINE
+Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast
{
return *this->ptr_;
}
// variable-size types only
ACE_INLINE
-Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast
+Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast
{
return this->ptr_;
}
@@ -518,7 +518,7 @@ Dynamic::ParameterList_var::inout (void)
return *this->ptr_;
}
-// mapping for variable size
+// mapping for variable size
ACE_INLINE ::Dynamic::ParameterList *&
Dynamic::ParameterList_var::out (void)
{
@@ -579,7 +579,7 @@ Dynamic::ParameterList_out::operator= (ParameterList *p)
return *this;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ParameterList_out::operator ::Dynamic::ParameterList *&() // cast
{
return this->ptr_;
@@ -608,7 +608,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
#if !defined (TAO_USE_SEQUENCE_TEMPLATES)
-
+
#if !defined (__TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_)
#define __TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_
@@ -616,36 +616,36 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (CORBA::ULong nelems)
{
CORBA::TypeCode **buf = 0;
-
+
ACE_NEW_RETURN (buf, CORBA::TypeCode*[nelems], 0);
-
+
for (CORBA::ULong i = 0; i < nelems; i++)
{
buf[i] = CORBA::TypeCode::_nil ();
}
-
+
return buf;
}
-
- ACE_INLINE void
+
+ ACE_INLINE void
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::freebuf (CORBA::TypeCode **buffer)
{
if (buffer == 0)
return;
delete[] buffer;
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (void)
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum)
: TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (maximum))
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum,
CORBA::ULong length,
@@ -654,7 +654,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
: TAO_Unbounded_Base_Sequence (maximum, length, value, release)
{
}
-
+
ACE_INLINE
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList(const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs)
: TAO_Unbounded_Base_Sequence (rhs)
@@ -663,12 +663,12 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
{
CORBA::TypeCode **tmp1 = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (this->maximum_);
CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < rhs.length_; ++i)
{
tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]);
}
-
+
this->buffer_ = tmp1;
}
else
@@ -676,17 +676,17 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
this->buffer_ = 0;
}
}
-
+
ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator= (const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs)
{
if (this == &rhs)
return *this;
-
+
if (this->release_)
{
CORBA::TypeCode **tmp = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_);
-
+
for (CORBA::ULong i = 0; i < this->length_; ++i)
{
CORBA::release (tmp[i]);
@@ -700,20 +700,20 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
}
else
this->buffer_ = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (rhs.maximum_);
-
+
TAO_Unbounded_Base_Sequence::operator= (rhs);
-
+
CORBA::TypeCode **tmp1 = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_);
CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_);
-
+
for (CORBA::ULong i = 0; i < rhs.length_; ++i)
{
tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]);
}
-
+
return *this;
}
-
+
ACE_INLINE TAO_Pseudo_Object_Manager<Dynamic::TypeCode,Dynamic::TypeCode_var>
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator[] (CORBA::ULong index) const
// read-write accessor
@@ -722,7 +722,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
CORBA::TypeCode ** const tmp = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_);
return TAO_Pseudo_Object_Manager<Dynamic::TypeCode,Dynamic::TypeCode_var> (tmp + index, this->release_);
}
-
+
ACE_INLINE CORBA::TypeCode* *
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (CORBA::Boolean orphan)
{
@@ -756,18 +756,18 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index)
}
return result;
}
-
+
ACE_INLINE const CORBA::TypeCode* *
Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (void) const
{
return ACE_reinterpret_cast(const CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_);
}
-
-
+
+
#endif /* end #if !defined */
-#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
+#endif /* !TAO_USE_SEQUENCE_TEMPLATES */
#if !defined (_DYNAMIC_EXCEPTIONLIST_CI_)
#define _DYNAMIC_EXCEPTIONLIST_CI_
@@ -823,7 +823,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p)
{
ExceptionList *deep_copy =
new ExceptionList (*p.ptr_);
-
+
if (deep_copy != 0)
{
ExceptionList *tmp = deep_copy;
@@ -833,7 +833,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p)
}
}
}
-
+
return *this;
}
@@ -849,27 +849,27 @@ Dynamic::ExceptionList_var::operator-> (void)
return this->ptr_;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ExceptionList_var::operator const ::Dynamic::ExceptionList &() const // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast
+ACE_INLINE
+Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast
{
return *this->ptr_;
}
-ACE_INLINE
-Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast
+ACE_INLINE
+Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast
{
return *this->ptr_;
}
// variable-size types only
ACE_INLINE
-Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast
+Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast
{
return this->ptr_;
}
@@ -892,7 +892,7 @@ Dynamic::ExceptionList_var::inout (void)
return *this->ptr_;
}
-// mapping for variable size
+// mapping for variable size
ACE_INLINE ::Dynamic::ExceptionList *&
Dynamic::ExceptionList_var::out (void)
{
@@ -953,7 +953,7 @@ Dynamic::ExceptionList_out::operator= (ExceptionList *p)
return *this;
}
-ACE_INLINE
+ACE_INLINE
Dynamic::ExceptionList_out::operator ::Dynamic::ExceptionList *&() // cast
{
return this->ptr_;
@@ -989,7 +989,7 @@ ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &strm, const Dynamic::Parame
return 1;
else
return 0;
-
+
}
ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_tao_aggregate)
@@ -1001,7 +1001,7 @@ ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_t
return 1;
else
return 0;
-
+
}
@@ -1033,4 +1033,3 @@ CORBA::Boolean TAO_Export operator>> (
);
#endif /* _TAO_CDR_OP_Dynamic_ExceptionList_I_ */
-
diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
index 5471cd4d1f3..75e6530afba 100644
--- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
+++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp
@@ -2,11 +2,14 @@
+
#include "DII_Reply_Dispatcher.h"
+
ACE_RCSID(DynamicInterface, DII_Reply_Dispatcher, "$Id$")
+
#include "Request.h"
#include "tao/Pluggable.h"
#include "tao/Environment.h"
@@ -20,8 +23,15 @@ TAO_DII_Deferred_Reply_Dispatcher::TAO_DII_Deferred_Reply_Dispatcher (
const CORBA::Request_ptr req,
TAO_ORB_Core *orb_core)
: TAO_Asynch_Reply_Dispatcher_Base (orb_core),
- reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE),
- 0,
+ db_ (sizeof buf_,
+ ACE_Message_Block::MB_DATA,
+ this->buf_,
+ orb_core->message_block_buffer_allocator (),
+ orb_core->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ orb_core->message_block_dblock_allocator ()),
+ reply_cdr_ (&db_,
+ ACE_Message_Block::DONT_DELETE,
TAO_ENCAP_BYTE_ORDER,
TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR,
@@ -44,8 +54,11 @@ TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply (
{
this->reply_status_ = params.reply_status_;
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.steal_from (params.input_cdr_);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
+
+ ACE_UNUSED_ARG (db);
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h
index 2e011729046..4bc0226620a 100644
--- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h
+++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h
@@ -53,13 +53,18 @@ public:
private:
+ /// The buffer that is used to initialise the data block
+ char buf_[ACE_CDR::DEFAULT_BUFSIZE];
+
+ /// datablock that is created on teh stack to initialise the CDR
+ /// stream underneath.
+ ACE_Data_Block db_;
+
+ /// CDR stream for reading the input.
TAO_InputCDR reply_cdr_;
- // CDR stream for reading the input.
- // @@ Carlos : message_state should go away. All we need is the reply
- // cdr. Is that right? (Alex).
+ /// Where the reply needs to go.
const CORBA::Request_ptr req_;
- // Where the reply needs to go.
};
#include "ace/post.h"
diff --git a/TAO/tao/Exception.cpp b/TAO/tao/Exception.cpp
index 9d9f12fe3e9..d33b1ebc4b8 100644
--- a/TAO/tao/Exception.cpp
+++ b/TAO/tao/Exception.cpp
@@ -1,5 +1,6 @@
// $Id$
+
// THREADING NOTE: calling thread handles mutual exclusion policy
// on all of these data structures.
@@ -26,6 +27,7 @@ ACE_RCSID (TAO,
Exception,
"$Id$")
+
// Static initializers.
ACE_Allocator *TAO_Exceptions::global_allocator_;
diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp
index 3645be52122..c0c1e3d8755 100644
--- a/TAO/tao/Exclusive_TMS.cpp
+++ b/TAO/tao/Exclusive_TMS.cpp
@@ -43,10 +43,6 @@ TAO_Exclusive_TMS::bind_dispatcher (CORBA::ULong request_id,
this->request_id_ = request_id;
this->rd_ = rd;
- // If there was a previous reply, cleanup its state first.
- // if (this->message_state_.message_size != 0)
- // this->message_state_.reset (0);
-
return TAO_Transport_Mux_Strategy::bind_dispatcher (request_id,
rd);
}
@@ -87,32 +83,6 @@ TAO_Exclusive_TMS::dispatch_reply (TAO_Pluggable_Reply_Params &params)
return rd->dispatch_reply (params);
}
-/*TAO_GIOP_Message_State *
-TAO_Exclusive_TMS::get_message_state (void)
-{
- if (this->rd_ != 0)
- {
- TAO_GIOP_Message_State* rd_message_state = this->rd_->message_state ();
- if (rd_message_state == 0)
- {
- // The Reply Dispatcher does not have one (the Asynch guys
- // don't) so go ahead and pass yours.
- return &this->message_state_;
- }
- // @@ TODO: it would seem like the "Right Thing"[tm] to do here
- // is to return rd_message_state, but when Michael changed this
- // stuff originally he left that out. I hesitate to make the
- // change on this revision, too many changes at the same time.
- }
- return &this->message_state_;
-}
-
-void
-TAO_Exclusive_TMS::destroy_message_state (TAO_GIOP_Message_State *)
-{
-}
-*/
-
int
TAO_Exclusive_TMS::idle_after_send (void)
{
diff --git a/TAO/tao/Exclusive_TMS.h b/TAO/tao/Exclusive_TMS.h
index ef22858e9b2..6b5114adcf2 100644
--- a/TAO/tao/Exclusive_TMS.h
+++ b/TAO/tao/Exclusive_TMS.h
@@ -53,10 +53,6 @@ public:
virtual int dispatch_reply (TAO_Pluggable_Reply_Params &params);
- // @@ Commented for the time being, let the commented line stay for
- // sometime - Bala
- // virtual TAO_GIOP_Message_State *get_message_state (void);
- // virtual void destroy_message_state (TAO_GIOP_Message_State *);
virtual int idle_after_send (void);
virtual int idle_after_reply (void);
virtual void connection_closed (void);
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index 9ffb54cc572..10417e4d38b 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -17,37 +17,21 @@
ACE_RCSID (tao, GIOP_Message_Base, "$Id$")
+
TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
- size_t input_cdr_size)
- : message_handler_ (orb_core,
- this,
- input_cdr_size),
- output_ (0),
+ size_t /*input_cdr_size*/)
+ : orb_core_ (orb_core),
+ message_state_ (orb_core,
+ this),
generator_parser_ (0)
{
-#if defined(ACE_HAS_PURIFY)
- (void) ACE_OS::memset (this->repbuf_,
- '\0',
- sizeof this->repbuf_);
-#endif /* ACE_HAS_PURIFY */
- ACE_NEW (this->output_,
- TAO_OutputCDR (this->repbuf_,
- sizeof this->repbuf_,
- TAO_ENCAP_BYTE_ORDER,
- orb_core->message_block_buffer_allocator (),
- orb_core->message_block_dblock_allocator (),
- orb_core->message_block_msgblock_allocator (),
- orb_core->orb_params ()->cdr_memcpy_tradeoff (),
- TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR,
- orb_core->to_iso8859 (),
- orb_core->to_unicode ()));
+
}
TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void)
{
- delete this->output_;
+
}
@@ -62,10 +46,9 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major,
void
-TAO_GIOP_Message_Base::reset (int reset_flag)
+TAO_GIOP_Message_Base::reset (void)
{
- // Reset the message state
- this->message_handler_.reset (reset_flag);
+ // no-op
}
int
@@ -184,44 +167,11 @@ TAO_GIOP_Message_Base::generate_reply_header (
int
-TAO_GIOP_Message_Base::read_message (TAO_Transport *transport,
+TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/,
int /*block */,
ACE_Time_Value * /*max_wait_time*/)
{
- // Call the handler to read and do a simple parse of the header of
- // the message.
- int retval =
- this->message_handler_.read_messages (transport);
-
- if (retval < 1)
- return retval;
-
- retval = this->message_handler_.parse_message_header ();
-
-
- // Error in the message that was received
- if (retval == -1)
- return -1;
- // If -2, we want the reactor to call us back, so return 1
- else if (retval == -2)
- return 1;
-
- if (retval != 0)
- {
- // Get the message state
- TAO_GIOP_Message_State &state =
- this->message_handler_.message_state ();
-
- // Set the state internally for parsing and generating messages
- this->set_state (state.giop_version.major,
- state.giop_version.minor);
- }
-
- // We return 2, it is ugly. But the reactor semantics has made us to
- // limp :(
- return 2;
-
-
+ return 0;
}
int
@@ -284,9 +234,12 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
}
TAO_Pluggable_Message_Type
-TAO_GIOP_Message_Base::message_type (void)
+TAO_GIOP_Message_Base::message_type (
+ TAO_GIOP_Message_State &msg_state)
{
- switch (this->message_handler_.message_state ().message_type)
+ // Convert to the right type of Pluggable Messaging message type.
+
+ switch (msg_state.message_type_)
{
case TAO_GIOP_REQUEST:
case TAO_GIOP_LOCATEREQUEST:
@@ -313,43 +266,264 @@ TAO_GIOP_Message_Base::message_type (void)
}
int
+TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
+{
+
+ if (this->message_state_.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+ssize_t
+TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
+{
+ // Actual message size including the header..
+ CORBA::ULong msg_size =
+ this->message_state_.message_size ();
+
+ size_t len = incoming.length ();
+
+ if (len > msg_size)
+ {
+ return -1;
+ }
+ else if (len == msg_size)
+ return 0;
+
+ return msg_size - len;
+}
+
+
+int
+TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd)
+{
+ TAO_GIOP_Message_State state (this->orb_core_,
+ this);
+
+ if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ if (incoming.length () > 0)
+ {
+ // Make a node which has a message block of the size of
+ // MESSAGE_HEADER_LEN.
+ qd =
+ this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ incoming.length ());
+ qd->missing_data_ = -1;
+ }
+ return 0;
+ }
+
+ if (state.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ size_t copying_len = state.message_size ();
+
+ qd = this->make_queued_data (copying_len);
+
+ if (copying_len > incoming.length ())
+ {
+ qd->missing_data_ =
+ copying_len - incoming.length ();
+
+ copying_len = incoming.length ();
+ }
+
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copying_len);
+
+ incoming.rd_ptr (copying_len);
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->msg_type_ = this->message_type (state);
+ return 1;
+}
+
+int
+TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming)
+{
+ // Look to see whether we had atleast parsed the GIOP header ...
+ if (qd->missing_data_ == -1)
+ {
+ // The data length that has been stuck in there during the last
+ // read ....
+ size_t len =
+ qd->msg_block_->length ();
+
+ // We know that we would have space for
+ // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
+ // from the <incoming> into the message block in <qd>
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ TAO_GIOP_MESSAGE_HEADER_LEN - len);
+
+ // Move the rd_ptr () in the incoming message block..
+ incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
+
+ TAO_GIOP_Message_State state (this->orb_core_,
+ this);
+
+ // Parse the message header now...
+ if (state.parse_message_header (*qd->msg_block_) == -1)
+ return -1;
+
+ // Now grow the message block so that we can copy the rest of
+ // the data...
+ ACE_CDR::grow (qd->msg_block_,
+ state.message_size ());
+
+ // Copy the pay load..
+
+ // Calculate the bytes that needs to be copied in the queue...
+ size_t copy_len =
+ state.message_size () - TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ // If teh data that needs to be copied is more than that is
+ // available to us ..
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+ else
+ {
+ qd->missing_data_ = 0;
+ }
+
+ // ..now we are set to copy the right amount of data to the
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ incoming.rd_ptr (copy_len);
+
+ // Get the other details...
+ qd->byte_order_ = state.byte_order_;
+ qd->major_version_ = state.giop_version_.major;
+ qd->minor_version_ = state.giop_version_.minor;
+ qd->msg_type_ = this->message_type (state);
+ }
+ else
+ {
+ // @@todo: Need to abstract this out to a seperate method...
+ size_t copy_len = qd->missing_data_;
+
+ if (copy_len > incoming.length ())
+ {
+ // Calculate the missing data..
+ qd->missing_data_ =
+ copy_len - incoming.length ();
+
+ // Set the actual possible copy_len that is available...
+ copy_len = incoming.length ();
+ }
+
+ // Copy the right amount of data in to the node...
+ // node..
+ qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len);
+
+ // Set the <rd_ptr> of the <incoming>..
+ qd->msg_block_->rd_ptr (copy_len);
+
+ }
+
+
+ return 0;
+}
+
+
+void
+TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
+{
+ // Get the message information
+ qd->byte_order_ =
+ this->message_state_.byte_order_;
+ qd->major_version_ =
+ this->message_state_.giop_version_.major;
+ qd->minor_version_ =
+ this->message_state_.giop_version_.minor;
+
+ qd->msg_type_=
+ this->message_type (this->message_state_);
+
+ // Reset the message_state
+ this->message_state_.reset ();
+}
+
+int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core)
+ TAO_Queued_Data *qd)
+
{
// Set the upcall thread
+ this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
+
+ // Set the state internally for parsing and generating messages
+ this->set_state (qd->major_version_,
+ qd->minor_version_);
+
+ // A buffer that we will use to initialise the CDR stream
+ char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
- orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ());
+#if defined(ACE_HAS_PURIFY)
+ (void) ACE_OS::memset (repbuf,
+ '\0',
+ sizeof repbuf);
+#endif /* ACE_HAS_PURIFY */
- // Reset the output CDR stream.
- // @@@@Is it necessary here?
- this->output_->reset ();
+ // Initialze an output CDR on the stack
+ TAO_OutputCDR output (repbuf,
+ sizeof repbuf,
+ TAO_ENCAP_BYTE_ORDER,
+ this->orb_core_->output_cdr_buffer_allocator (),
+ this->orb_core_->output_cdr_dblock_allocator (),
+ this->orb_core_->output_cdr_msgblock_allocator (),
+ this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_->to_iso8859 (),
+ this->orb_core_->to_unicode ());
// Get the read and write positions before we steal data.
- size_t rd_pos = this->message_handler_.rd_pos ();
- size_t wr_pos = this->message_handler_.wr_pos ();
+ size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ qd->msg_block_->rd_ptr ()),
+ qd->msg_block_->length ());
- TAO_GIOP_Message_State &state =
- this->message_handler_.message_state ();
// Create a input CDR stream.
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here. The same is also done in the higher layers.
- TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (),
- 0,
+
+ TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
+ ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
- this->message_handler_.message_state ().byte_order,
- state.giop_version.major,
- state.giop_version.minor,
- orb_core);
+ qd->byte_order_,
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_);
- // Set giop version info for the outstream so that server replies
- // in correct GIOP version
- output_->set_version (state.giop_version.major, state.giop_version.minor);
-
- // Reset the message handler to receive upcalls if any
- this->message_handler_.reset (0);
// We know we have some request message. Check whether it is a
// GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
@@ -358,20 +532,20 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
// the stream and never touch that again for anything. We basically
// loose ownership of the data_block.
- switch (this->message_handler_.message_state ().message_type)
+ switch (qd->msg_type_)
{
case TAO_GIOP_REQUEST:
// Should be taken care by the state specific invocations. They
// could raise an exception or write things in the output CDR
// stream
return this->process_request (transport,
- orb_core,
- input_cdr);
+ input_cdr,
+ output);
case TAO_GIOP_LOCATEREQUEST:
return this->process_locate_request (transport,
- orb_core,
- input_cdr);
+ input_cdr,
+ output);
default:
return -1;
}
@@ -379,31 +553,40 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
int
TAO_GIOP_Message_Base::process_reply_message (
- TAO_Pluggable_Reply_Params &params
- )
+ TAO_Pluggable_Reply_Params &params,
+ TAO_Queued_Data *qd)
{
+ // Set the state internally for parsing and generating messages
+ this->set_state (qd->major_version_,
+ qd->minor_version_);
+
// Get the read and write positions before we steal data.
- size_t rd_pos = this->message_handler_.rd_pos ();
- size_t wr_pos = this->message_handler_.wr_pos ();
+ size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
- TAO_GIOP_Message_State &state =
- this->message_handler_.message_state ();
+ this->dump_msg ("recv",
+ ACE_reinterpret_cast (u_char *,
+ qd->msg_block_->rd_ptr ()),
+ qd->msg_block_->length ());
- // Create a input CDR stream.
+
+ // Create a empty buffer on stack
// NOTE: We use the same data block in which we read the message and
// we pass it on to the higher layers of the ORB. So we dont to any
// copies at all here. The same is alos done in the higher layers.
- TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (),
- 0,
+ TAO_InputCDR input_cdr (qd->msg_block_->data_block (),
+ ACE_Message_Block::DONT_DELETE,
rd_pos,
wr_pos,
- this->message_handler_.message_state ().byte_order,
- state.giop_version.major,
- state.giop_version.minor);
+ qd->byte_order_,
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_);
// Reset the message state. Now, we are ready for the next nested
// upcall if any.
- this->message_handler_.reset (0);
+ // this->message_handler_.reset (0);
// We know we have some reply message. Check whether it is a
// GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
@@ -412,7 +595,7 @@ TAO_GIOP_Message_Base::process_reply_message (
// the stream and never touch that again for anything. We basically
// loose ownership of the data_block.
- switch (this->message_handler_.message_state ().message_type)
+ switch (qd->msg_type_)
{
case TAO_GIOP_REPLY:
// Should be taken care by the state specific parsing
@@ -500,16 +683,16 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type t,
int
TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
- TAO_ORB_Core *orb_core,
- TAO_InputCDR &cdr)
+ TAO_InputCDR &cdr,
+ TAO_OutputCDR &output)
{
// This will extract the request header, set <response_required>
// and <sync_with_server> as appropriate.
TAO_ServerRequest request (this,
cdr,
- *this->output_,
+ output,
transport,
- orb_core);
+ this->orb_core_);
CORBA::ULong request_id = 0;
CORBA::Boolean response_required = 0;
@@ -533,10 +716,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
CORBA::Object_var forward_to;
// Do this before the reply is sent.
- orb_core->adapter_registry ()->dispatch (request.object_key (),
- request,
- forward_to,
- ACE_TRY_ENV);
+ this->orb_core_->adapter_registry ()->dispatch (
+ request.object_key (),
+ request,
+ forward_to,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
if (!CORBA::is_nil (forward_to.in ()))
@@ -551,12 +735,12 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
reply_params.service_context_notowned (&request.reply_service_info ());
// Make the GIOP header and Reply header
- this->generate_reply_header (*this->output_,
+ this->generate_reply_header (output,
reply_params);
- *this->output_ << forward_to.in ();
+ output << forward_to.in ();
- int result = transport->send_message (*this->output_);
+ int result = transport->send_message (output);
if (result == -1)
{
if (TAO_debug_level > 0)
@@ -580,7 +764,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
if (response_required)
{
result = this->send_reply_exception (transport,
- orb_core,
+ this->orb_core_,
request_id,
&request.reply_service_info (),
&ACE_ANY_EXCEPTION);
@@ -638,7 +822,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
CORBA::COMPLETED_MAYBE);
result = this->send_reply_exception (transport,
- orb_core,
+ this->orb_core_,
request_id,
&request.reply_service_info (),
&exception);
@@ -679,13 +863,13 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
int
TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
- TAO_ORB_Core* orb_core,
- TAO_InputCDR &input)
+ TAO_InputCDR &input,
+ TAO_OutputCDR &output)
{
// This will extract the request header, set <response_required> as
// appropriate.
TAO_GIOP_Locate_Request_Header locate_request (input,
- orb_core);
+ this->orb_core_);
TAO_GIOP_Locate_Status_Msg status_info;
@@ -706,13 +890,6 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
CORBA::COMPLETED_NO));
}
- // Execute a fake request to find out if the object is there or
- // if the POA can activate it on demand...
- char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
- TAO_OutputCDR dummy_output (repbuf,
- sizeof repbuf);
- // This output CDR is not used!
-
TAO_ObjectKey tmp_key (locate_request.object_key ().length (),
locate_request.object_key ().length (),
locate_request.object_key ().get_buffer (),
@@ -731,9 +908,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
deferred_reply,
tmp_key,
"_non_existent",
- dummy_output,
+ output,
transport,
- orb_core,
+ this->orb_core_,
parse_error);
if (parse_error != 0)
@@ -744,10 +921,11 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
CORBA::Object_var forward_to;
- orb_core->adapter_registry ()->dispatch (server_request.object_key (),
- server_request,
- forward_to,
- ACE_TRY_ENV);
+ this->orb_core_->adapter_registry ()->dispatch (
+ server_request.object_key (),
+ server_request,
+ forward_to,
+ ACE_TRY_ENV);
ACE_TRY_CHECK;
if (!CORBA::is_nil (forward_to.in ()))
@@ -809,27 +987,29 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
return this->make_send_locate_reply (transport,
locate_request,
- status_info);
+ status_info,
+ output);
}
int
TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
TAO_GIOP_Locate_Request_Header &request,
- TAO_GIOP_Locate_Status_Msg &status_info)
+ TAO_GIOP_Locate_Status_Msg &status_info,
+ TAO_OutputCDR &output)
{
// Note here we are making the Locate reply header which is *QUITE*
// different from the reply header made by the make_reply () call..
// Make the GIOP message header
this->write_protocol_header (TAO_GIOP_LOCATEREPLY,
- *this->output_);
+ output);
// This writes the header & body
- this->generator_parser_->write_locate_reply_mesg (*this->output_,
+ this->generator_parser_->write_locate_reply_mesg (output,
request.request_id (),
status_info);
// Send the message
- int result = transport->send_message (*this->output_);
+ int result = transport->send_message (output);
// Print out message if there is an error
if (result == -1)
@@ -883,7 +1063,8 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
0,
ACE_Message_Block::DONT_DELETE,
0);
- ACE_Message_Block message_block(&data_block);
+ ACE_Message_Block message_block(&data_block,
+ ACE_Message_Block::DONT_DELETE);
message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
size_t bt;
@@ -1179,23 +1360,36 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (void)
return this->generator_parser_->is_ready_for_bidirectional ();
}
-int
-TAO_GIOP_Message_Base::more_messages (void)
+
+TAO_Queued_Data *
+TAO_GIOP_Message_Base::make_queued_data (size_t sz)
{
- int retval =
- this->message_handler_.is_message_ready ();
+ // Get a node for the queue..
+ TAO_Queued_Data *qd =
+ TAO_Queued_Data::get_queued_data ();
- if (retval <= 0)
- return retval;
+ // Make a datablock for the size requested + something. The
+ // "something" is required because we are going to align the data
+ // block in the message block. During alignment we could loose some
+ // bytes. As we may not know how many bytes will be lost, we will
+ // allocate ACE_CDR::MAX_ALIGNMENT extra.
+ ACE_Data_Block *db =
+ this->orb_core_->data_block_for_message_block (sz +
+ ACE_CDR::MAX_ALIGNMENT);
+ ACE_Allocator *alloc =
+ this->orb_core_->message_block_msgblock_allocator ();
- // Get the message state
- TAO_GIOP_Message_State &state =
- this->message_handler_.message_state ();
+ ACE_Message_Block mb (db,
+ 0,
+ alloc);
+
+ ACE_Message_Block *new_mb = mb.duplicate ();
+
+ ACE_CDR::mb_align (new_mb);
+
+ qd->msg_block_ = new_mb;
- // Set the state internally for parsing and generating messages
- this->set_state (state.giop_version.major,
- state.giop_version.minor);
- return retval;
+ return qd;
}
diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h
index ecb194b1148..c30113ec963 100644
--- a/TAO/tao/GIOP_Message_Base.h
+++ b/TAO/tao/GIOP_Message_Base.h
@@ -22,10 +22,12 @@
#endif /* ACE_LACKS_PRAGMA_ONCE */
#include "tao/GIOP_Message_Generator_Parser_Impl.h"
-#include "tao/GIOP_Message_Reactive_Handler.h"
#include "tao/GIOP_Utils.h"
+#include "tao/GIOP_Message_State.h"
+
class TAO_Pluggable_Reply_Params;
+class TAO_Queued_Data;
/**
* @class TAO_GIOP_Message_Base
@@ -41,7 +43,7 @@ class TAO_Pluggable_Reply_Params;
class TAO_Export TAO_GIOP_Message_Base : public TAO_Pluggable_Messaging
{
public:
- friend class TAO_GIOP_Message_Reactive_Handler;
+ // friend class TAO_GIOP_Message_Reactive_Handler;
/// Constructor
TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
@@ -56,7 +58,7 @@ public:
CORBA::Octet minor);
/// Reset the messaging the object
- virtual void reset (int reset_flag = 1);
+ virtual void reset (void);
/// Write the RequestHeader in to the <cdr> stream. The underlying
/// implementation of the mesaging should do the right thing.
@@ -91,27 +93,40 @@ public:
/// the message.
virtual int format_message (TAO_OutputCDR &cdr);
+ /// Parse the incoming messages..
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block);
- /// Get the message type. The return value would be one of the
- /// following:
- /// TAO_PLUGGABLE_MESSAGE_REQUEST,
- /// TAO_PLUGGABLE_MESSAGE_REPLY,
- /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
- /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
- virtual TAO_Pluggable_Message_Type message_type (void);
+ /// Calculate the amount of data that is missing in the <incoming>
+ /// message block.
+ virtual ssize_t missing_data (ACE_Message_Block &message_block);
+ /* Extract the details of the next message from the <incoming>
+ * through <qd>. Returns 1 if there are more messages and returns a
+ * 0 if there are no more messages in <incoming>.
+ */
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd);
+ /// Check whether the node <qd> needs consolidation from <incoming>
+ virtual int consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming);
+
+ /// Get the details of the message parsed through the <qd>.
+ virtual void get_message_data (TAO_Queued_Data *qd);
/// Process the request message that we have received on the
/// connection
virtual int process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core);
+ TAO_Queued_Data *qd);
+
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
- TAO_Pluggable_Reply_Params &reply_info
- );
+ TAO_Pluggable_Reply_Params &reply_info,
+ TAO_Queued_Data *qd);
+
+
/// Generate a reply message with the exception <ex>.
virtual int generate_exception_reply (
@@ -124,13 +139,13 @@ protected:
/// Processes the <GIOP_REQUEST> messages
int process_request (TAO_Transport *transport,
- TAO_ORB_Core *orb_core,
- TAO_InputCDR &input);
+ TAO_InputCDR &input,
+ TAO_OutputCDR &output);
/// Processes the <GIOP_LOCATE_REQUEST> messages
int process_locate_request (TAO_Transport *transport,
- TAO_ORB_Core *orb_core,
- TAO_InputCDR &input);
+ TAO_InputCDR &input,
+ TAO_OutputCDR &output);
/// Set the state
void set_state (CORBA::Octet major,
@@ -141,6 +156,14 @@ protected:
const u_char *ptr,
size_t len);
+ /// Get the message type. The return value would be one of the
+ /// following:
+ /// TAO_PLUGGABLE_MESSAGE_REQUEST,
+ /// TAO_PLUGGABLE_MESSAGE_REPLY,
+ /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION,
+ /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR.
+ TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state);
+
private:
/// Writes the GIOP header in to <msg>
@@ -156,7 +179,8 @@ private:
/// unmanageable difference comes let them be implemented here.
int make_send_locate_reply (TAO_Transport *transport,
TAO_GIOP_Locate_Request_Header &request,
- TAO_GIOP_Locate_Status_Msg &status);
+ TAO_GIOP_Locate_Status_Msg &status,
+ TAO_OutputCDR &output);
/// Send error messages
int send_error (TAO_Transport *transport);
@@ -184,39 +208,26 @@ private:
/// request/response?
virtual int is_ready_for_bidirectional (void);
- /// Are there any more messages that needs processing
- virtual int more_messages (void);
+ /// Creates a new node for the queue with a message block in the
+ /// node of size <sz>..
+ TAO_Queued_Data *make_queued_data (size_t sz);
private:
+ /// Cached ORB_Core pointer...
+ TAO_ORB_Core *orb_core_;
+
/// Thr message handler object that does reading and parsing of the
/// incoming messages
- TAO_GIOP_Message_Reactive_Handler message_handler_;
-
- /// Output CDR
- TAO_OutputCDR *output_;
-
- /// Allocators for the output CDR that we hold. As we cannot rely on
- /// the resources from ORB Core we reserve our own resources. The
- /// reason that we cannot believe the ORB core is that, for a
- /// multi-threaded servers it dishes out resources cached in
- /// TSS. This would be dangerous as TSS gets destroyed before we
- /// would. So we have our own memory that we can rely on.
- /// Implementations of GIOP that we have
- ACE_Allocator *cdr_buffer_alloc_;
- ACE_Allocator *cdr_dblock_alloc_;
- ACE_Allocator *cdr_msgblock_alloc_;
-
- /// A buffer that we will use to initialise the CDR stream
- char repbuf_[ACE_CDR::DEFAULT_BUFSIZE];
+ TAO_GIOP_Message_State message_state_;
/// All the implementations of GIOP message generator and parsers
TAO_GIOP_Message_Generator_Parser_Impl tao_giop_impl_;
protected:
-
/// The generator and parser state.
TAO_GIOP_Message_Generator_Parser *generator_parser_;
+
};
diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i
index 45b18282a10..a589447a413 100644
--- a/TAO/tao/GIOP_Message_Base.i
+++ b/TAO/tao/GIOP_Message_Base.i
@@ -4,41 +4,3 @@
//
// GIOP_Message_Base
//
-# if 0
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::header_len (void)
-{
- return TAO_GIOP_MESSAGE_HEADER_LEN;
-}
-
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::message_size_offset (void)
-{
- return TAO_GIOP_MESSAGE_SIZE_OFFSET;
-}
-
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::major_version_offset (void)
-{
- return TAO_GIOP_VERSION_MAJOR_OFFSET;
-}
-
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::minor_version_offset (void)
-{
- return TAO_GIOP_VERSION_MINOR_OFFSET;
-}
-
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::flags_offset (void)
-{
- return TAO_GIOP_MESSAGE_FLAGS_OFFSET;
-}
-
-ACE_INLINE const size_t
-TAO_GIOP_Message_Base::message_type_offset (void)
-{
- return TAO_GIOP_MESSAGE_TYPE_OFFSET;
-}
-
-#endif /*if 0*/
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.cpp b/TAO/tao/GIOP_Message_Reactive_Handler.cpp
index a8af560081a..da22b2e93ab 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.cpp
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.cpp
@@ -8,393 +8,95 @@
#include "tao/GIOP_Message_Base.h"
#include "Transport.h"
+#if 0
#if !defined (__ACE_INLINE__)
# include "tao/GIOP_Message_Reactive_Handler.inl"
#endif /* __ACE_INLINE__ */
-ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$")
+#endif /*if 0*/
-TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (TAO_ORB_Core * orb_core,
- TAO_GIOP_Message_Base *base,
- size_t input_cdr_size)
- : message_state_ (orb_core),
- mesg_base_ (base),
- message_status_ (TAO_GIOP_WAITING_FOR_HEADER),
- message_size_ (input_cdr_size),
- current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)),
- supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size))
+ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$")
+TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (
+ TAO_ORB_Core * /*orb_core*/,
+ TAO_GIOP_Message_Base * /*base*/)
{
- // NOTE: The message blocks here use a locked allocator which is not
- // from the TSS even if there is one. We are getting the allocators
- // from the global memory. We shouldn't be using the TSS stuff for
- // the following reason
- // (a) The connection handlers are per-connection and not
- // per-thread.
- // (b) The order of cleaning is important if we use allocators from
- // TSS. The TSS goes away when the threads go away. But the
- // connection handlers go away only when the ORB decides to shut
- // it down.
- ACE_CDR::mb_align (&this->current_buffer_);
-
- // Calculate the effective message after alignment
- this->message_size_ -= this->rd_pos ();
-}
-
-
-
-int
-TAO_GIOP_Message_Reactive_Handler::read_messages (TAO_Transport *transport)
-{
- // Read the message from the transport. The size of the message read
- // is the maximum size of the buffer that we have less the amount of
- // data that has already been read in to the buffer.
- ssize_t n = transport->recv (this->current_buffer_.wr_ptr (),
- this->current_buffer_.space ());
-
- if (n == -1)
- {
- if (errno == EWOULDBLOCK)
- return 0;
-
- return -1;
- }
- // @@ What are the other error handling here??
- else if (n == 0)
- {
- return -1;
- }
-
- if (TAO_debug_level == 5)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - GIOP_Message_Reactive_Handler::read_messages"
- " received %d bytes\n",
- n));
-
- size_t len;
- for (size_t offset = 0; offset < size_t(n); offset += len)
- {
- len = n - offset;
- if (len > 512)
- len = 512;
- ACE_HEX_DUMP ((LM_DEBUG,
- this->current_buffer_.wr_ptr () + offset,
- len,
- "TAO (%P|%t) - read_messages "));
- }
- ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n));
- }
-
-
- // Now we have a succesful read. First adjust the write pointer
- this->current_buffer_.wr_ptr (n);
-
- // Success
- return 1;
}
-int
-TAO_GIOP_Message_Reactive_Handler::parse_message_header (void)
-{
- // Check what message are we waiting for and take suitable action
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER)
- {
- size_t len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- if (len > TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // Parse the GIOP header
- if (this->parse_message_header_i (buf) == -1)
-
- return -1;
-
- int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN,
- len);
-
- // Set the pointer read pointer position in the
- // <current_buffer_>
- size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN;
-
- if (retval)
- {
- // We had a fragment header, so the position should be
- // beyond that
- pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
- }
-
- this->current_buffer_.rd_ptr (pos);
- buf = this->current_buffer_.rd_ptr ();
-
- // The GIOP header has been parsed. Set the status to wait for
- // payload
- this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
-
- return 1;
- }
-
- // We dont have sufficient information to decipher the GIOP
- // header. Make sure that the reactor calls us back.
- return -2;
- }
-
- // The last read just "read" left-over messages
- return 0;
-}
int
-TAO_GIOP_Message_Reactive_Handler::is_message_ready (void)
+TAO_GIOP_Message_Reactive_Handler::parse_message_header (ACE_Message_Block &incoming,
+ TAO_GIOP_Message_State &state)
{
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- size_t len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- // Set the buf pointer to the start of the GIOP header
- buf -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // Dump the incoming message . It will be dumped only if the
- // debug level is greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len + TAO_GIOP_MESSAGE_HEADER_LEN);
- if (len == this->message_state_.message_size)
- {
- // If the buffer length is equal to the size of the payload we
- // have exactly one message.
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
-
- // Check whether we have received only the first part of the
- // fragment.
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else if (len > this->message_state_.message_size)
- {
- // If the length is greater we have received some X messages
- // and a part of X + 1 messages (probably) with X varying
- // from 1 to N.
- this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
-
- // Clone the data that we read in.
- this->supp_buffer_.data_block (
- this->current_buffer_.data_block ()->clone ());
-
- // Set the read and write pointer for the supplementary
- // buffer.
- size_t rd_pos = this->rd_pos ();
- this->supp_buffer_.rd_ptr (rd_pos +
- this->message_state_.message_size);
- this->supp_buffer_.wr_ptr (this->wr_pos ());
-
- // Reset the current buffer
- this->current_buffer_.reset ();
-
- // Set the read and write pointers again for the current
- // buffer. We change the write pointer settings as we would
- // like to process a single message.
- this->current_buffer_.rd_ptr (rd_pos);
- this->current_buffer_.wr_ptr (rd_pos +
- this->message_state_.message_size);
-
- return this->message_state_.is_complete (this->current_buffer_);
- }
- }
- else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES)
+ // @@Bala: Need to make a check whether we are waiting for the
+ // header...
+ if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN)
{
- size_t len = this->supp_buffer_.length ();
-
- if (len > TAO_GIOP_MESSAGE_HEADER_LEN)
- {
- // @@ What about fragment headers???
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- len = this->current_buffer_.length ();
- char *buf = this->current_buffer_.rd_ptr ();
-
- if (this->parse_message_header_i (buf) == -1)
-
- return -1;
-
- // Set the pointer read pointer position in the
- // <current_buffer_>
- this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
-
- // The GIOP header has been parsed. Set the status to wait for
- // payload
- this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
-
- return this->get_message ();
- }
- else
- {
- // We have smaller than the header size left here. We
- // just copy the rest of the stuff and reset things so that
- // we can read the rest of the stuff from the socket.
- this->current_buffer_.copy (
- this->supp_buffer_.rd_ptr (),
- len);
-
- // Reset the supp buffer now
- this->supp_buffer_.reset ();
-
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- }
-
+ // Parse the GIOP header
+ if (this->parse_message_header_i (incoming, state) == -1)
+ return -1;
}
- // Just send us back to the reactor so that we can for more data to
- // come in .
return 0;
}
-ACE_Data_Block *
-TAO_GIOP_Message_Reactive_Handler::steal_data_block (void)
-{
- ACE_Data_Block *db =
- this->current_buffer_.data_block ()->clone_nocopy ();
-
- ACE_Data_Block *old_db =
- this->current_buffer_.replace_data_block (db);
-
- ACE_CDR::mb_align (&this->current_buffer_);
-
- return old_db;
-}
-
-
-void
-TAO_GIOP_Message_Reactive_Handler::reset (int reset_flag)
-{
- // Reset the contents of the message state
- this->message_state_.reset (reset_flag);
-
- // Reset the current buffer
- this->current_buffer_.reset ();
-
- ACE_CDR::mb_align (&this->current_buffer_);
-
- if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES)
- {
- this->supp_buffer_.reset ();
- ACE_CDR::mb_align (&this->supp_buffer_);
- }
-
-}
-
-
int
-TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (char *buf)
+TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (ACE_Message_Block &incoming,
+ TAO_GIOP_Message_State &state)
{
if (TAO_debug_level > 8)
{
ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n"));
}
- // Check whether we have a GIOP Message in the first place
- if (this->parse_magic_bytes (buf) == -1)
- return -1;
+ // Grab the rd_ptr_ from the message block..
+ char *buf = incoming.rd_ptr ();
- // Let us be specific that this is for 1.0
- if (this->message_state_.giop_version.minor == 0 &&
- this->message_state_.giop_version.major == 1)
+ // Parse the magic bytes first
+ if (this->parse_magic_bytes (buf) == -1)
{
- this->message_state_.byte_order =
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
-
- if (this->message_state_.byte_order != 0 &&
- this->message_state_.byte_order != 1)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
- ACE_TEXT (" for version <1.0>\n"),
- this->message_state_.byte_order));
- return -1;
- }
+ return -1;
}
- else
- {
- // Read the byte ORDER
- this->message_state_.byte_order =
- (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
- // Read the fragment bit
- this->message_state_.more_fragments =
- (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
+ // Get the version information
+ if (this->get_version_info (buf, state) == -1)
+ return -1;
- if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
- {
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
- ACE_TEXT (" for version <%d %d> \n"),
- buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
- this->message_state_.giop_version.major,
- this->message_state_.giop_version.minor));
- return -1;
- }
- }
+ // Get the byte order information...
+ if (this->get_byte_order_info (buf, state) == -1)
+ return -1;
// Get the message type
- this->message_state_.message_type =
- buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
-
- // Get the payload size. If the payload size is greater than the
- // length then set the length of the message block to that
- // size. Move the rd_ptr to the end of the GIOP header
- this->message_state_.message_size = this->get_payload_size (buf);
+ state.message_type = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
- // If the message_size or the payload_size is zero then something
- // is fishy. So return an error.
- if (this->message_state_.message_size == 0)
- return -1;
+ // Get the payload size
+ this->get_payload_size (buf, state);
- if (TAO_debug_level > 2)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"),
- this->message_state_.giop_version.major,
- this->message_state_.giop_version.minor,
- this->message_state_.byte_order,
- this->message_state_.message_type,
- this->message_state_.message_size));
- }
+ // Parse the
+ int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN,
+ len);
- return 1;
-}
+ // Set the pointer read pointer position in the
+ // <current_buffer_>
+ size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN;
+ if (retval)
+ {
+ // We had a fragment header, so the position should be
+ // beyond that
+ pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+ }
-int
-TAO_GIOP_Message_Reactive_Handler::parse_fragment_header (char *buf,
- size_t length)
-{
- size_t len =
- TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN;
-
- // By this point we are doubly sure that we have a more or less
- // valid GIOP message with a valid major revision number.
- if (this->message_state_.more_fragments &&
- this->message_state_.giop_version.minor == 2 &&
- length > len)
- {
- // Fragmented message in GIOP 1.2 should have a fragment header
- // following the GIOP header. Grab the rd_ptr to get that
- // info.
- this->message_state_.request_id = this->read_ulong (buf);
+ this->current_buffer_.rd_ptr (pos);
+ buf = this->current_buffer_.rd_ptr ();
- // As we parsed the header
- return 1;
+ // The GIOP header has been parsed. Set the status to wait for
+ // payload
+ this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD;
}
+ return 1;
- return 0;
-}
int
@@ -406,7 +108,7 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
&& buf [2] == 0x4f // 'O'
&& buf [3] == 0x50)) // 'P'
{
- // For the present...
+ // For the present...
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) bad header, "
@@ -418,12 +120,25 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
return -1;
}
+ return 0;
+}
+
+int
+TAO_GIOP_Message_Reactive_Handler::get_version_info (char *buf,
+ TAO_GIOP_Message_State &state)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n"));
+ }
+
// We have a GIOP message on hand. Get its revision numbers
CORBA::Octet incoming_major =
buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
CORBA::Octet incoming_minor =
buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+ // Check the revision information
if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
incoming_major,
incoming_minor) == 0)
@@ -439,55 +154,81 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf)
}
// Set the version
- this->message_state_.giop_version.minor = incoming_minor;
- this->message_state_.giop_version.major = incoming_major;
+ state.giop_version.minor = incoming_minor;
+ state.giop_version.major = incoming_major;
return 0;
}
+int
+TAO_GIOP_Message_Reactive_Handler::get_byte_order_info (char *buf,
+ TAO_GIOP_Message_State &message_state)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n"));
+ }
+
+ // Let us be specific that this is for 1.0
+ if (message_state.giop_version.minor == 0 &&
+ message_state.giop_version.major == 1)
+ {
+ message_state_.byte_order =
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
+
+ if (message_state.byte_order != 0 &&
+ message_state.byte_order != 1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
+ ACE_TEXT (" for version <1.0>\n"),
+ message_state.byte_order));
+ return -1;
+ }
+ }
+ else
+ {
+ // Read the byte ORDER
+ message_state.byte_order =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
+
+ // Read the fragment bit
+ message_state.more_fragments =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
+
+ if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
+ ACE_TEXT (" for version <%d %d> \n"),
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
+ message_state_.giop_version.major,
+ message_state_.giop_version.minor));
+ return -1;
+ }
+ }
+
+ return 0;
+}
CORBA::ULong
-TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr)
+TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr,
+ TAO_GIOP_Message_State &message_state)
{
// Move the read pointer
rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
- CORBA::ULong x = this->read_ulong (rd_ptr);
-
- if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_)
- {
- if (ACE_CDR::grow (&this->current_buffer_,
- x + TAO_GIOP_MESSAGE_HEADER_LEN) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P | %t) Unable to increase the size \n")
- ACE_TEXT ("of the buffer \n")));
- return 0;
- }
-
- // New message size is the size of the now larger buffer.
- this->message_size_ = x +
- TAO_GIOP_MESSAGE_HEADER_LEN +
- ACE_CDR::MAX_ALIGNMENT;
- }
-
- // Set the read pointer to the end of the GIOP message
- // this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
- return x;
-}
+ CORBA::ULong x = 0;
-CORBA::ULong
-TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr)
-{
size_t msg_size = 4;
- char *buf = ACE_ptr_align_binary (ptr,
+ char *buf = ACE_ptr_align_binary (rd_ptr,
msg_size);
- CORBA::ULong x;
#if !defined (ACE_DISABLE_SWAP_ON_READ)
- if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER))
+ if (!(state.byte_order != ACE_CDR_BYTE_ORDER))
{
x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
}
@@ -499,76 +240,5 @@ TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr)
x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf);
#endif /* ACE_DISABLE_SWAP_ON_READ */
- return x;
-}
-
-
-
-
-int
-TAO_GIOP_Message_Reactive_Handler::get_message (void)
-{
- if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD)
- {
- size_t len = this->supp_buffer_.length ();
- char * buf =
- this->current_buffer_.rd_ptr ();
-
- buf -= TAO_GIOP_MESSAGE_HEADER_LEN;
-
- if (len == this->message_state_.message_size)
- {
- // If the buffer length is equal to the size of the payload we
- // have exactly one message. Check whether we have received
- // only the first part of the fragment.
- this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER;
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->message_state_.message_size);
-
- // The message will be dumped only if the debug level is
- // greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len +
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (this->message_state_.message_size);
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else if (len > this->message_state_.message_size)
- {
- // If the length is greater we have received some X messages
- // and a part of X + 1 messages (probably) with X varying
- // from 1 to N.
- this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES;
-
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->message_state_.message_size);
-
- // The message will be dumped only if the debug level is
- // greater than 5 anyway.
- this->mesg_base_->dump_msg (
- "Recv msg",
- ACE_reinterpret_cast (u_char *,
- buf),
- len +
- TAO_GIOP_MESSAGE_HEADER_LEN);
-
- this->supp_buffer_.rd_ptr (this->message_state_.message_size);
- return this->message_state_.is_complete (this->current_buffer_);
- }
- else
- {
- // The remaining message in the supp buffer
- this->current_buffer_.copy (this->supp_buffer_.rd_ptr (),
- this->supp_buffer_.length ());
-
- // Reset the supp buffer now
- this->supp_buffer_.reset ();
- }
- }
-
- return 0;
+ message_state.message_size = x;
}
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.h b/TAO/tao/GIOP_Message_Reactive_Handler.h
index b38a74c27c5..16c0503851e 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.h
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.h
@@ -13,29 +13,19 @@
#ifndef TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H
#define TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H
#include "ace/pre.h"
-#include "ace/Message_Block.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "tao/GIOP_Message_State.h"
-class TAO_Transport;
+
class TAO_ORB_Core;
class TAO_GIOP_Message_Base;
+class TAO_GIOP_Message_State;
+class ACE_Message_Block;
-enum TAO_GIOP_Message_Status
-{
- /// The buffer is waiting for the header of the message yet
- TAO_GIOP_WAITING_FOR_HEADER = 0,
-
- /// The buffer is waiting for the payload to appear on the socket
- TAO_GIOP_WAITING_FOR_PAYLOAD,
-
- /// The buffer has got multiple messages
- TAO_GIOP_MULTIPLE_MESSAGES
-};
/**
* @class TAO_GIOP_Message_Reactive_Handler
@@ -54,6 +44,9 @@ enum TAO_GIOP_Message_Status
* reading the header and the payload seperately.
*/
+
+
+#if 0
class TAO_Export TAO_GIOP_Message_Reactive_Handler
{
public:
@@ -79,7 +72,7 @@ public:
/// - We have sufficient info for processing the header and we
/// processed it succesfully. (return 1);
/// - Any errors in processing will return a -1.
- int parse_message_header (void);
+ int parse_message_header (ACE_Message_Block &message_block);
/// Check whether we have atleast one complete message ready for
/// processing.
@@ -173,7 +166,11 @@ private:
/// is then sent to the higher layers of the ORB.
ACE_Message_Block supp_buffer_;
};
+#if defined (__ACE_INLINE__)
+# include "tao/GIOP_Message_Reactive_Handler.inl"
+#endif /* __ACE_INLINE__ */
+#endif
const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12;
const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8;
@@ -183,9 +180,7 @@ const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5;
const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4;
const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4;
-#if defined (__ACE_INLINE__)
-# include "tao/GIOP_Message_Reactive_Handler.inl"
-#endif /* __ACE_INLINE__ */
+
#include "ace/post.h"
#endif /*TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H*/
diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.inl b/TAO/tao/GIOP_Message_Reactive_Handler.inl
index 91211b34464..66b856377dc 100644
--- a/TAO/tao/GIOP_Message_Reactive_Handler.inl
+++ b/TAO/tao/GIOP_Message_Reactive_Handler.inl
@@ -3,6 +3,8 @@
+
+#if 0
ACE_INLINE TAO_GIOP_Message_State &
TAO_GIOP_Message_Reactive_Handler::message_state (void)
{
@@ -28,3 +30,5 @@ TAO_GIOP_Message_Reactive_Handler::wr_pos (void) const
return
this->current_buffer_.wr_ptr () - this->current_buffer_.base ();
}
+
+#endif
diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp
index c71c5a638e5..36ebdcd41ae 100644
--- a/TAO/tao/GIOP_Message_State.cpp
+++ b/TAO/tao/GIOP_Message_State.cpp
@@ -1,115 +1,270 @@
-// -*- C++ -*-
-
-//$Id$
+// $Id$
#include "tao/GIOP_Message_State.h"
-#include "tao/GIOP_Utils.h"
+#include "tao/GIOP_Message_Generator_Parser_Impl.h"
#include "tao/ORB_Core.h"
+#include "tao/Pluggable.h"
+#include "tao/debug.h"
+#include "tao/GIOP_Message_Base.h"
+//#include "Transport.h"
#if !defined (__ACE_INLINE__)
-# include "tao/GIOP_Message_State.i"
+# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
+
ACE_RCSID(tao, GIOP_Message_State, "$Id$")
- TAO_GIOP_Message_State::TAO_GIOP_Message_State (TAO_ORB_Core* /*orb_core*/)
- : byte_order (TAO_ENCAP_BYTE_ORDER),
- message_type (TAO_GIOP_MESSAGERROR),
- message_size (0),
- request_id (0),
- // Problem similar to GIOP_Message_handler.cpp - Bala
- fragmented_messages (ACE_CDR::DEFAULT_BUFSIZE),
- more_fragments (0)
+TAO_GIOP_Message_State::TAO_GIOP_Message_State (
+ TAO_ORB_Core * /*orb_core*/,
+ TAO_GIOP_Message_Base * /*base*/)
+ : giop_version_ (TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR),
+ byte_order_ (0),
+ message_type_ (0),
+ message_size_ (0),
+ request_id_ (0),
+ more_fragments_ (0),
+ missing_data_ (0)
{
- //giop_version.major = TAO_DEF_GIOP_MAJOR;
- //giop_version.minor = TAO_DEF_GIOP_MINOR;
}
-TAO_GIOP_Message_State::~TAO_GIOP_Message_State (void)
+
+int
+TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming)
{
- // @@ Bala: this is not a very useful comment, is it?
- //no-op
+ if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ // Parse the GIOP header
+ if (this->parse_message_header_i (incoming) == -1)
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming)
+{
+ if (TAO_debug_level > 8)
+ {
+ ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n"));
+ }
+
+ // Grab the rd_ptr_ from the message block..
+ char *buf = incoming.rd_ptr ();
+
+ // Parse the magic bytes first
+ if (this->parse_magic_bytes (buf) == -1)
+ {
+ return -1;
+ }
+
+ // Get the version information
+ if (this->get_version_info (buf) == -1)
+ return -1;
+
+ // Get the byte order information...
+ if (this->get_byte_order_info (buf) == -1)
+ return -1;
+
+ // Get the message type
+ this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET];
+
+ // Get the size of the message..
+ this->get_payload_size (buf);
+
+ if (this->message_size_ == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Message with size 0 recd.. \n")));
+ return -1;
+ }
+
+ if (this->more_fragments_)
+ {
+ // Parse the
+ /*int retval = */
+ this->parse_fragment_header (buf,
+ incoming.length ());
+ }
+
+ return 0;
+}
+
+
+
+
+int
+TAO_GIOP_Message_State::parse_magic_bytes (char *buf)
+{
+ // The values are hard-coded to support non-ASCII platforms.
+ if (!(buf [0] == 0x47 // 'G'
+ && buf [1] == 0x49 // 'I'
+ && buf [2] == 0x4f // 'O'
+ && buf [3] == 0x50)) // 'P'
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) bad header, "
+ "magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"),
+ buf[0],
+ buf[1],
+ buf[2],
+ buf[3]));
+ return -1;
+ }
+
+ return 0;
}
int
-TAO_GIOP_Message_State::is_complete (ACE_Message_Block &current_buf)
+TAO_GIOP_Message_State::get_version_info (char *buf)
{
- if (this->more_fragments)
+ if (TAO_debug_level > 8)
{
- if (this->fragmented_messages.length () == 0)
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n"));
+ }
+
+ // We have a GIOP message on hand. Get its revision numbers
+ CORBA::Octet incoming_major =
+ buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet incoming_minor =
+ buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+
+ // Check the revision information
+ if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision (
+ incoming_major,
+ incoming_minor) == 0)
+ {
+ if (TAO_debug_level > 0)
{
- this->first_fragment_byte_order = this->byte_order;
- this->first_fragment_giop_version = this->giop_version;
- this->first_fragment_message_type = this->message_type;
- // this->fragments_end = this->fragments_begin = current;
- this->fragmented_messages.copy (current_buf.rd_ptr (),
- current_buf.length ());
-
- // Reset the buffer
- current_buf.reset ();
-
- // Reset our state
- this->reset ();
- return 0;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"),
+ incoming_major, incoming_minor));
}
- return this->append_fragment (current_buf);
+ return -1;
}
- if (this->fragmented_messages.length () != 0)
+ // Set the version
+ this->giop_version_.minor = incoming_minor;
+ this->giop_version_.major = incoming_major;
+
+ return 0;
+}
+
+int
+TAO_GIOP_Message_State::get_byte_order_info (char *buf)
+{
+ if (TAO_debug_level > 8)
{
- // This is the last message, but we must defragment before
- // sending
- if (this->append_fragment (current_buf) == -1)
- return -1;
+ ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n"));
+ }
- // Copy the entire message block into <current_buf>
- current_buf.data_block (this->fragmented_messages.data_block ()->clone ());
+ // Let us be specific that this is for 1.0
+ if (this->giop_version_.minor == 0 &&
+ this->giop_version_.major == 1)
+ {
+ this->byte_order_ =
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
- this->fragmented_messages.reset ();
+ if (this->byte_order_ != 0 &&
+ this->byte_order_ != 1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>")
+ ACE_TEXT (" for version <1.0>\n"),
+ this->byte_order_));
+ return -1;
+ }
+ }
+ else
+ {
+ // Read the byte ORDER
+ this->byte_order_ =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01);
- this->byte_order = this->first_fragment_byte_order;
- this->giop_version = this->first_fragment_giop_version;
- this->message_type = this->first_fragment_message_type;
+ // Read the fragment bit
+ this->more_fragments_ =
+ (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02);
- // This message has no more fragments, and there where no fragments
- // before it, just return. Notice that current_buf has the
- // *right* contents
+ if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>")
+ ACE_TEXT (" for version <%d %d> \n"),
+ buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET],
+ this->giop_version_.major,
+ this->giop_version_.minor));
+ return -1;
+ }
}
+ return 0;
+}
+
+void
+TAO_GIOP_Message_State::get_payload_size (char *rd_ptr)
+{
+ // Move the read pointer
+ rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET;
- return 1;
+ this->message_size_ = this->read_ulong (rd_ptr);
}
+
+
int
-TAO_GIOP_Message_State::append_fragment (ACE_Message_Block& current)
+TAO_GIOP_Message_State::parse_fragment_header (char *buf,
+ size_t length)
{
- if (this->first_fragment_byte_order != this->byte_order
- || this->first_fragment_giop_version.major != this->giop_version.major
- || this->first_fragment_giop_version.minor != this->giop_version.minor)
+ size_t len =
+ TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ buf += TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ // By this point we are doubly sure that we have a more or less
+ // valid GIOP message with a valid major revision number.
+ if (this->giop_version_.minor == 2 && length > len)
{
- // Yes, print it out in all debug levels!. This is an error by
- // CORBA 2.4 spec
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) incompatible fragments:\n")
- ACE_TEXT (" Different GIOP versions or byte order\n")));
- this->reset ();
- return -1;
+ // Fragmented message in GIOP 1.2 should have a fragment header
+ // following the GIOP header. Grab the rd_ptr to get that
+ // info.
+ this->request_id_ = this->read_ulong (buf);
+
+ // As we parsed the header
+ return 1;
}
- size_t req_size =
- this->fragmented_messages.size () + current.length ();
+ return 0;
+}
- this->fragmented_messages.size (req_size);
+CORBA::ULong
+TAO_GIOP_Message_State::read_ulong (char *rd_ptr)
+{
+ CORBA::ULong x = 0;
- // Copy the message
- this->fragmented_messages.copy (current.rd_ptr (),
- current.length ());
+ size_t msg_size = 4;
- current.reset ();
+ char *buf = ACE_ptr_align_binary (rd_ptr,
+ msg_size);
- // Reset our state
- this->reset ();
+#if !defined (ACE_DISABLE_SWAP_ON_READ)
+ if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER))
+ {
+ x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf);
+ }
+ else
+ {
+ ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x));
+ }
+#else
+ x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf);
+#endif /* ACE_DISABLE_SWAP_ON_READ */
- return 0;
+ return x;
}
diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h
index a71d53869ac..3f262373f54 100644
--- a/TAO/tao/GIOP_Message_State.h
+++ b/TAO/tao/GIOP_Message_State.h
@@ -11,25 +11,22 @@
*
* @author Chris Cleeland <cleeland@cs.wustl.edu>
* @author Carlos O' Ryan <coryan@uci.edu>
+ * @author modified by Balachandran Natarajan <bala@cs.wustl.edu>
*/
//=============================================================================
-
-
#ifndef TAO_GIOP_MESSAGE_STATE_H
#define TAO_GIOP_MESSAGE_STATE_H
#include "ace/pre.h"
-#include "tao/TAO_Export.h"
+#include "tao/GIOP_Message_Version.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
-#include "tao/corbafwd.h"
-#include "tao/CDR.h"
-#include "tao/GIOP_Message_Version.h"
class TAO_ORB_Core;
+class TAO_GIOP_Message_Base;
/**
@@ -37,49 +34,81 @@ class TAO_ORB_Core;
*
* @brief Generic definitions for Message States.
*
- * This represents the state of the incoming GIOP message
- * As the ORB processes incoming messages it needs to keep track of
- * how much of the message has been read, if there are any
- * fragments following this message etc.
+ * This helps to establish the state of the incoming messages.
*/
+
class TAO_Export TAO_GIOP_Message_State
{
-
public:
+
/// Ctor
- TAO_GIOP_Message_State (TAO_ORB_Core *orb_core);
+ TAO_GIOP_Message_State (TAO_ORB_Core *orb_core,
+ TAO_GIOP_Message_Base *base);
+
+ /// Parse the message header.
+ int parse_message_header (ACE_Message_Block &incoming);
+
+ /// Return the message size
+ CORBA::ULong message_size (void) const;
+
+ /// Return the message size
+ CORBA::ULong payload_size (void) const;
+
+ /// Return the byte order information
+ CORBA::Octet byte_order (void) const;
- /// Dtor
- ~TAO_GIOP_Message_State (void);
+ /// Reset the state..
+ void reset (void);
- ///Reset the message header state and prepare it to receive the next
- /// event.
- void reset (int reset_contents = 1);
+private:
+
+ friend class TAO_GIOP_Message_Base;
+
+ /// Parse the message header.
+ int parse_message_header_i (ACE_Message_Block &incoming);
- /// Has the header been received?
- CORBA::Boolean header_received (void) const;
+ /// Checks for the magic word 'GIOP' in the start of the incoing
+ /// stream
+ int parse_magic_bytes (char *buf);
- /// Check if the current message is complete, adjusting the fragments
- /// if required...
- int is_complete (ACE_Message_Block &current_buf);
+ /// Extracts the version information from the incoming
+ /// stream. Performs a check for whether the version information is
+ /// right and sets the information in the <state>
+ int get_version_info (char *buf);
- /// Did we get fragmented messages?
- int message_fragmented (void);
+ /// Extracts the byte order information from the incoming
+ /// stream. Performs a check for whether the byte order information
+ /// right and sets the information in the <state>
+ int get_byte_order_info (char *buf);
- /// Version info
- TAO_GIOP_Message_Version giop_version;
+ /// Gets the size of the payload and set the size in the <state>
+ void get_payload_size (char *buf);
+
+ /// Parses the GIOP FRAGMENT_HEADER information from the incoming
+ /// stream.
+ int parse_fragment_header (char *buf,
+ size_t length);
+
+ /// Read the unsigned long from the buffer. The <buf> should just
+ /// point to the next 4 bytes data that represent the ULong
+ CORBA::ULong read_ulong (char *buf);
+
+private:
+
+ // GIOP version information..
+ TAO_GIOP_Message_Version giop_version_;
/// 0 = big, 1 = little
- CORBA::Octet byte_order;
+ CORBA::Octet byte_order_;
/// MsgType above
- CORBA::Octet message_type;
+ CORBA::Octet message_type_;
/// in byte_order!
- CORBA::ULong message_size;
+ CORBA::ULong message_size_;
/// Request Id from the Fragment header
- CORBA::ULong request_id;
+ CORBA::ULong request_id_;
/**
* The fragments are collected in a chain of message blocks (using
@@ -87,7 +116,7 @@ public:
* chain is reassembled into the main message block that is sent
* along
*/
- ACE_Message_Block fragmented_messages;
+ // ACE_Message_Block fragmented_messages;
/**
@@ -101,11 +130,11 @@ public:
* 3) Even if we allowed that at this layer the CDR classes are
* not prepared to handle that.
*/
- CORBA::Octet first_fragment_byte_order;
+ // CORBA::Octet first_fragment_byte_order;
/// The GIOP version for the first fragment
/// @@ Same as above, all GIOP versions must match.
- TAO_GIOP_Message_Version first_fragment_giop_version;
+ // TAO_GIOP_Message_Version first_fragment_giop_version;
/**
* If the messages are chained this represents the message type for
@@ -113,23 +142,26 @@ public:
* fragment and the upper level needs to know if it is a request,
* locate request or what).
*/
- CORBA::Octet first_fragment_message_type;
+ // CORBA::Octet first_fragment_message_type;
/// (Requests and Replys)
- CORBA::Octet more_fragments;
-
-private:
- /// Append <current> to the list of fragments
- /// Also resets the state, because the current message was consumed.
- int append_fragment (ACE_Message_Block &current);
-
+ CORBA::Octet more_fragments_;
+ /// Missing data
+ CORBA::ULong missing_data_;
};
+const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12;
+const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8;
+const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6;
+const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7;
+const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5;
+const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4;
+const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4;
#if defined (__ACE_INLINE__)
-# include "tao/GIOP_Message_State.i"
+# include "tao/GIOP_Message_State.inl"
#endif /* __ACE_INLINE__ */
#include "ace/post.h"
diff --git a/TAO/tao/GIOP_Message_State.i b/TAO/tao/GIOP_Message_State.i
index 653bf47d6a8..773faefe69b 100644
--- a/TAO/tao/GIOP_Message_State.i
+++ b/TAO/tao/GIOP_Message_State.i
@@ -1,8 +1,6 @@
// -*- C++ -*-
//$Id$
-// ****************************************************************
-// @@ Bala: we use the stars to separate classes in ACE+TAO
//
// Inlined methods for TAO_GIOP_Message_State
@@ -10,7 +8,7 @@
ACE_INLINE int
TAO_GIOP_Message_State::message_fragmented (void)
{
- if (this->more_fragments)
+ if (this->more_fragments_)
return 1;
return 0;
@@ -19,12 +17,12 @@ TAO_GIOP_Message_State::message_fragmented (void)
ACE_INLINE void
TAO_GIOP_Message_State::reset (int /*reset_contents*/)
{
- this->message_size = 0;
- this->more_fragments = 0;
+ this->message_size_ = 0;
+ this->more_fragments_ = 0;
}
ACE_INLINE CORBA::Boolean
TAO_GIOP_Message_State::header_received (void) const
{
- return this->message_size != 0;
+ return this->message_size_ != 0;
}
diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl
new file mode 100644
index 00000000000..b6d2989eaa9
--- /dev/null
+++ b/TAO/tao/GIOP_Message_State.inl
@@ -0,0 +1,58 @@
+// -*- C++ -*-
+
+//$Id$
+
+ACE_INLINE CORBA::ULong
+TAO_GIOP_Message_State::message_size (void) const
+{
+ CORBA::ULong len =
+ this->message_size_ + TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ if (this->more_fragments_ &&
+ this->giop_version_.minor > 1)
+ len += TAO_GIOP_MESSAGE_FRAGMENT_HEADER;
+
+ return len;
+}
+
+ACE_INLINE CORBA::ULong
+TAO_GIOP_Message_State::payload_size (void) const
+{
+ return this->message_size_;
+}
+
+ACE_INLINE CORBA::Octet
+TAO_GIOP_Message_State::byte_order (void) const
+{
+ return this->byte_order_;
+}
+
+ACE_INLINE void
+TAO_GIOP_Message_State::reset (void)
+{
+ this->message_type_ = 0;
+ this->message_size_ = 0;
+ this->more_fragments_ = 0;
+ this->request_id_ = 0;
+ this->missing_data_ = 0;
+}
+
+#if 0
+ACE_INLINE int
+TAO_GIOP_Message_State::message_fragmented (void)
+{
+ if (this->more_fragments)
+ return 1;
+
+ return 0;
+}
+
+
+
+ACE_INLINE CORBA::Boolean
+TAO_GIOP_Message_State::header_received (void) const
+{
+ return this->message_size != 0;
+}
+
+#endif
diff --git a/TAO/tao/IIOP_Acceptor.cpp b/TAO/tao/IIOP_Acceptor.cpp
index 1a0e99030e2..4cfe224bae8 100644
--- a/TAO/tao/IIOP_Acceptor.cpp
+++ b/TAO/tao/IIOP_Acceptor.cpp
@@ -2,6 +2,7 @@
// $Id$
+
#include "tao/IIOP_Acceptor.h"
#include "tao/IIOP_Profile.h"
#include "tao/MProfile.h"
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index c05a558efd3..f897a5e0e11 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -1,17 +1,18 @@
// $Id$
-#include "tao/IIOP_Connection_Handler.h"
-#include "tao/Timeprobe.h"
-#include "tao/debug.h"
-#include "tao/ORB_Core.h"
-#include "tao/ORB.h"
-#include "tao/CDR.h"
-#include "tao/Messaging_Policy_i.h"
-#include "tao/Server_Strategy_Factory.h"
-#include "tao/IIOP_Transport.h"
-#include "tao/IIOP_Endpoint.h"
-#include "tao/Transport_Cache_Manager.h"
-#include "tao/Base_Transport_Property.h"
+#include "IIOP_Connection_Handler.h"
+#include "Timeprobe.h"
+#include "debug.h"
+#include "ORB_Core.h"
+#include "ORB.h"
+#include "CDR.h"
+#include "Messaging_Policy_i.h"
+#include "Server_Strategy_Factory.h"
+#include "IIOP_Transport.h"
+#include "IIOP_Endpoint.h"
+#include "Transport_Cache_Manager.h"
+#include "Base_Transport_Property.h"
+#include "Resume_Handle.h"
#if !defined (__ACE_INLINE__)
# include "tao/IIOP_Connection_Handler.i"
@@ -45,7 +46,7 @@ TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (TAO_ORB_Core *orb_core
{
TAO_IIOP_Transport* specific_transport = 0;
ACE_NEW(specific_transport,
- TAO_IIOP_Transport(this, orb_core, 0));
+ TAO_IIOP_Transport (this, orb_core, 0));
// store this pointer (indirectly increment ref count)
this->transport(specific_transport);
@@ -196,12 +197,12 @@ TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle,
// in turn take appropiate action (such as sending exceptions to
// all waiting reply handlers).
if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) ")
- ACE_TEXT ("IIOP_Connection_Handler::handle_close ")
- ACE_TEXT ("(%d, %d)\n"),
- handle,
- rm));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) ")
+ ACE_TEXT ("IIOP_Connection_Handler::handle_close ")
+ ACE_TEXT ("(%d, %d)\n"),
+ handle,
+ rm));
--this->pending_upcalls_;
if (this->pending_upcalls_ <= 0)
@@ -244,8 +245,18 @@ TAO_IIOP_Connection_Handler::fetch_handle (void)
}
int
+TAO_IIOP_Connection_Handler::resume_handler (void)
+{
+ return TAO_RESUMES_CONNECTION_HANDLER;
+}
+
+int
TAO_IIOP_Connection_Handler::handle_output (ACE_HANDLE)
{
+ // Instantiate the resume handle here.. This will automatically
+ // resume the handle once data is written..
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ this->fetch_handle ());
return this->transport ()->handle_output ();
}
@@ -310,48 +321,28 @@ TAO_IIOP_Connection_Handler::process_listen_point_list (
int
-TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h)
-
-{
- return this->handle_input_i (h);
-}
-
+TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE)
-int
-TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
- ACE_Time_Value *max_wait_time)
{
+ // Increase the reference count on the upcall that have passed us.
this->pending_upcalls_++;
- // Call the transport read the message
- int result = this->transport ()->read_process_message (max_wait_time);
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ this->fetch_handle ());
- // Now the message has been read
- if (result == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("IIOP_Connection_Handler::read_message \n")));
-
- }
+ int retval = this->transport ()->handle_input_i (resume_handle);
// The upcall is done. Bump down the reference count
if (--this->pending_upcalls_ <= 0)
- result = -1;
+ retval = -1;
- if (result == -1 ||
- result == 1)
- return result;
-
- return 0;
+ return retval;
}
-
-
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/tao/IIOP_Connection_Handler.h b/TAO/tao/IIOP_Connection_Handler.h
index fe220557d87..40a53456a4f 100644
--- a/TAO/tao/IIOP_Connection_Handler.h
+++ b/TAO/tao/IIOP_Connection_Handler.h
@@ -111,6 +111,10 @@ public:
/// Return the underlying handle
virtual ACE_HANDLE fetch_handle (void);
+ /// Send a TRUE value to the reactor, so that the reactor does not
+ /// resume the handler
+ virtual int resume_handler (void);
+
/// Use peer() to drain the outgoing message queue
virtual int handle_output (ACE_HANDLE);
@@ -131,8 +135,6 @@ protected:
/// ensure that server threads eventually exit.
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0);
private:
diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp
index 0561c25189a..d3e5eb60938 100644
--- a/TAO/tao/IIOP_Connector.cpp
+++ b/TAO/tao/IIOP_Connector.cpp
@@ -17,7 +17,6 @@ ACE_RCSID (TAO,
IIOP_Connector,
"$Id$")
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Node<ACE_INET_Addr>;
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
index a680226fd1e..d3c664db9af 100644
--- a/TAO/tao/IIOP_Transport.cpp
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -16,7 +16,7 @@
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
+//#include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "tao/IIOP_Transport.i"
@@ -26,12 +26,13 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$")
TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
+ CORBA::Boolean /*flag*/)
: TAO_Transport (TAO_TAG_IIOP_PROFILE,
orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
+#if 0
if (flag)
{
// Use the lite version of the protocol
@@ -39,6 +40,7 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
TAO_GIOP_Message_Lite (orb_core));
}
else
+#endif
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
@@ -81,52 +83,37 @@ TAO_IIOP_Transport::recv_i (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
- return this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
-}
-
-
-int
-TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
- int block)
-{
- // Read the message of the socket
- int result = this->messaging_object_->read_message (this,
- block,
+ ssize_t n = this->connection_handler_->peer ().recv (buf,
+ len,
max_wait_time);
- if (result == -1)
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 4)
{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("IIOP_Transport::read_message, failure ")
- ACE_TEXT ("in read_message ()")));
-
- this->tms_->connection_closed ();
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv_i () \n")));
}
- if (result < 2)
- return result;
-
- // Now we know that we have been able to read the complete message
- // here.. We loop here to see whether we have read more than one
- // message in our read.
- // Set the result state
- result = 1;
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
- // See we use the reactor semantics again
- while (result > 0)
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
{
- result = this->process_message ();
+ return -1;
}
- return result;
+ return n;
}
-
int
TAO_IIOP_Transport::register_handler_i (void)
{
@@ -261,131 +248,6 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
return this->connection_handler_->process_listen_point_list (listen_list);
}
-int
-TAO_IIOP_Transport::process_message (void)
-{
- // Check whether we have messages for processing
- int retval =
- this->messaging_object_->more_messages ();
-
- if (retval <= 0)
- return retval;
-
- // Get the <message_type> that we have received
- TAO_Pluggable_Message_Type t =
- this->messaging_object_->message_type ();
-
-
- int result = 0;
- if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("Close Connection Message recd \n")));
-
- this->tms_->connection_closed ();
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
- {
- if (this->messaging_object_->process_request_message (
- this,
- this->orb_core ()) == -1)
- return -1;
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- TAO_Pluggable_Reply_Params params (this->orb_core ());
-
- if (this->messaging_object_->process_reply_message (params) == -1)
- {
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("IIOP_Transport::process_message, ")
- ACE_TEXT ("process_reply_message ()")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- result = this->tms_->dispatch_reply (params);
-
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
- if (result == -1)
- {
- // Something really critical happened, we will forget about
- // every reply on this connection.
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::")
- ACE_TEXT ("process_message - ")
- ACE_TEXT ("dispatch reply failed\n")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- if (result == 0)
- {
- this->messaging_object_->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- {
- return -1;
- }
-
- return 1;
-}
-
-
void
TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
{
diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h
index 940ab085b2e..6492dba9e8c 100644
--- a/TAO/tao/IIOP_Transport.h
+++ b/TAO/tao/IIOP_Transport.h
@@ -82,9 +82,6 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
- virtual int read_process_message (ACE_Time_Value *max_time_value = 0,
- int block =0);
-
virtual int register_handler_i (void);
/// Method to do whatever it needs to do when the connection
@@ -118,9 +115,6 @@ public:
private:
- /// Process the message that we have read
- int process_message (void);
-
/// Set the Bidirectional context info in the service context list
void set_bidir_context_info (TAO_Operation_Details &opdetails);
diff --git a/TAO/tao/IORInfo.cpp b/TAO/tao/IORInfo.cpp
index ab7edb16ccb..0e3e2071078 100644
--- a/TAO/tao/IORInfo.cpp
+++ b/TAO/tao/IORInfo.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "IORInfo.h"
#include "PolicyC.h"
#include "IOPC.h"
@@ -9,10 +10,9 @@
ACE_RCSID (tao, IORInfo, "$Id$")
-
TAO_IORInfo::TAO_IORInfo (TAO_ORB_Core *orb_core,
- TAO_MProfile &mp,
- CORBA::PolicyList *policy_list)
+ TAO_MProfile &mp,
+ CORBA::PolicyList *policy_list)
: orb_core_ (orb_core),
mp_ (mp),
policy_list_ (policy_list)
@@ -25,7 +25,7 @@ TAO_IORInfo::~TAO_IORInfo (void)
CORBA::Policy_ptr
TAO_IORInfo::get_effective_policy (CORBA::PolicyType type,
- CORBA::Environment &ACE_TRY_ENV)
+ CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Check the policy list supplied by the POA.
@@ -34,12 +34,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type,
for (CORBA::ULong i = 0; i < policy_count; ++i)
{
CORBA::PolicyType pt =
- (*(this->policy_list_))[i]->policy_type (
+ (*(this->policy_list_))[i]->policy_type (
TAO_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (CORBA::Policy::_nil ());
if (pt == type)
- return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]);
+ return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]);
}
// TODO: Now check the global ORB policies.
@@ -47,12 +47,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type,
ACE_THROW_RETURN (CORBA::INV_POLICY (TAO_OMG_VMCID | 2,
CORBA::COMPLETED_NO),
- CORBA::Policy::_nil ());
+ CORBA::Policy::_nil ());
}
void
TAO_IORInfo::add_ior_component (const IOP::TaggedComponent &component,
- CORBA::Environment &ACE_TRY_ENV)
+ CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((CORBA::SystemException))
{
// Add the given tagged component to all profiles.
@@ -85,12 +85,12 @@ TAO_IORInfo::add_ior_component_to_profile (
TAO_Profile *profile = this->mp_.get_profile (i);
if (profile->tag () == profile_id)
- {
- profile->add_tagged_component (component, ACE_TRY_ENV);
+ {
+ profile->add_tagged_component (component, ACE_TRY_ENV);
ACE_CHECK;
found_profile = 1;
- }
+ }
}
// According to the Portable Interceptor specification, we're
diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp
new file mode 100644
index 00000000000..17f8b31d6b9
--- /dev/null
+++ b/TAO/tao/Incoming_Message_Queue.cpp
@@ -0,0 +1,151 @@
+#include "Incoming_Message_Queue.h"
+#include "ORB_Core.h"
+#include "debug.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "Incoming_Message_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID (tao, Incoming_Message_Queue, "$Id$")
+
+
+TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core)
+ : queued_data_ (0),
+ size_ (0),
+ orb_core_ (orb_core)
+{
+}
+
+TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void)
+{
+ // Delete the QD
+ if (this->size_)
+ {
+ TAO_Queued_Data *qd = this->dequeue_head ();
+ TAO_Queued_Data::release (qd);
+ }
+}
+
+size_t
+TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block)
+{
+ // The size of message that is copied
+ size_t n = 0;
+
+ if (this->size_ > 0)
+ {
+ // Check to see if the length of the incoming block is less than
+ // that of the <missing_data_> of the tail.
+ if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_)
+ {
+ n = block.length ();
+ }
+ else
+ {
+ n = this->queued_data_->missing_data_;
+ }
+
+ // Do the copy
+ this->queued_data_->msg_block_->copy (block.rd_ptr (),
+ n);
+
+ // Decerement the missing data
+ this->queued_data_->missing_data_ -= n;
+ }
+
+ return n;
+}
+
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::dequeue_head (void)
+{
+ // Get the node on the head of the queue...
+ TAO_Queued_Data *tmp =
+ this->queued_data_->next_;
+
+ // Reset the head node..
+ this->queued_data_->next_ = tmp->next_;
+
+ // Decrease the size
+ --this->size_;
+
+ return tmp;
+}
+
+TAO_Queued_Data *
+TAO_Incoming_Message_Queue::dequeue_tail (void)
+{
+ // This is a bit painful stuff...
+ if (this->size_ == 0)
+ return 0;
+
+ // Get the node on the head of the queue...
+ TAO_Queued_Data *tmp =
+ this->queued_data_->next_;
+
+ while (tmp->next_ != this->queued_data_)
+ {
+ tmp = tmp->next_;
+ }
+
+ // Put the head in tmp.
+ tmp->next_ = this->queued_data_->next_;
+
+ TAO_Queued_Data *ret_qd = this->queued_data_;
+
+ this->queued_data_ = tmp;
+
+ // Decrease the size
+ --this->size_;
+
+ return ret_qd;
+}
+
+
+int
+TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd)
+{
+ if (this->size_ == 0)
+ {
+ this->queued_data_ = nd;
+ this->queued_data_->next_ = this->queued_data_;
+ }
+ else
+ {
+ nd->next_ = this->queued_data_->next_;
+ this->queued_data_->next_ = nd;
+ this->queued_data_ = nd;
+ }
+
+ ++ this->size_;
+ return 0;
+}
+
+
+/************************************************************************/
+// Methods for TAO_Queued_Data
+/************************************************************************/
+
+
+TAO_Queued_Data::TAO_Queued_Data (void)
+ : msg_block_ (0),
+ missing_data_ (0),
+ byte_order_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0)
+{
+}
+
+TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb)
+ : msg_block_ (mb),
+ missing_data_ (0),
+ byte_order_ (0),
+ major_version_ (0),
+ minor_version_ (0),
+ msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR),
+ next_ (0)
+{
+}
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
new file mode 100644
index 00000000000..cfbe613166e
--- /dev/null
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -0,0 +1,161 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Incoming_Queued_Message.h
+ *
+ * $Id$
+ *
+ * @author Balachandran Natarajan <bala@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_INCOMING_MESSAGE_QUEUE_H
+#define TAO_INCOMING_MESSAGE_QUEUE_H
+#include "ace/pre.h"
+
+#include "Pluggable_Messaging_Utils.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+/// Forward declarations
+class ACE_Data_Block;
+class TAO_ORB_Core;
+class TAO_Queued_Data;
+class TAO_Transport;
+
+/**
+ * @class TAO_Incoming_Message_Queue
+ *
+ * @brief A queue of the messages in the incoming data path.
+ *
+ * Please read the documentation in the TAO_Transport class to find
+ * out more about the design of the incoming data path.
+ *
+ * Under certain conditions TAO may have to maintain a queue
+ * per-connection. This queue is drained by the pluggable
+ * protocols framework, normally under control of the ACE_Reactor, but
+ * other configurations are conceivable.
+ *
+ * The memory that is allocated for holding the messages comes from
+ * the global pool for the following reasons
+ *
+ * - the thread that reads a part of the message would not be the same
+ * thread that reads and fills the rest of the message
+ * - the thread that actually processes the message can be totally
+ * different.
+ *
+ */
+
+class TAO_Export TAO_Incoming_Message_Queue
+{
+public:
+ /// Contructor.
+ TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core);
+
+ /// Destructor.
+ ~TAO_Incoming_Message_Queue (void);
+
+ /// Adding and deleting a node from the queue.
+ TAO_Queued_Data *dequeue_head (void);
+ TAO_Queued_Data *dequeue_tail (void);
+ int enqueue_tail (TAO_Queued_Data *nd);
+
+ /// Copy message from <block> to the tail of the queue. The size
+ /// of message that is copied to the tail node is returned. The
+ /// number of bytes copied depends on the amount of bytes needed to
+ /// make the tail node consistent.
+ size_t copy_tail (ACE_Message_Block &block);
+
+ /// Return the length of the queue..
+ CORBA::ULong queue_length (void);
+
+ /// Methods for sanity check. Checks to see whether the node on the
+ /// head or tail is complete or not and ready for further
+ /// processing.
+ int is_tail_complete (void);
+ int is_head_complete (void);
+
+ /// Return the size of data that is missing in tail of the queue.
+ size_t missing_data_tail (void) const;
+ /// void missing_data (size_t data);
+
+private:
+
+ friend class TAO_Transport;
+
+ /// Make a node for the queue.
+ TAO_Queued_Data *get_node (void);
+
+private:
+
+ /// A circular linked listof messages that await processing
+ TAO_Queued_Data *queued_data_;
+
+ /// The size of the queue
+ CORBA::ULong size_;
+
+ /// Copy of our ORB Core
+ TAO_ORB_Core *orb_core_;
+};
+
+/************************************************************************/
+
+/**
+ * @class TAO_Queued_Data
+ *
+ * @brief Represents a node in the queue of incoming messages.
+ *
+ * This class contains necessary information about a message that is
+ * stored in the queue. Such a node can be used by the incoming thread
+ * from the reactor to dequeue and process the message by sending it
+ * to the higher layers of the ORB.
+ */
+
+class TAO_Export TAO_Queued_Data
+{
+public:
+ /// Default Constructor
+ TAO_Queued_Data (void);
+
+ /// Constructor.
+ TAO_Queued_Data (ACE_Message_Block *mb);
+
+ /// Creation and deletion of a node in the queue.
+ static TAO_Queued_Data* get_queued_data (void);
+
+ static void release (TAO_Queued_Data *qd);
+
+ /// The message block that contains the message.
+ ACE_Message_Block *msg_block_;
+
+ /// Data missing in the above message that hasn't been read or
+ /// processed yet.
+ CORBA::Long missing_data_;
+
+ /// The byte order of the message that is stored in the node..
+ CORBA::Octet byte_order_;
+
+ /// Many protocols like GIOP have a major and minor version
+ /// information that would be needed to read and decipher the
+ /// message.
+ CORBA::Octet major_version_;
+
+ CORBA::Octet minor_version_;
+
+ /// The message type of the message
+ TAO_Pluggable_Message_Type msg_type_;
+
+ /// Pounter to the next element in the queue.
+ TAO_Queued_Data *next_;
+};
+
+
+#if defined (__ACE_INLINE__)
+# include "Incoming_Message_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /*TAO_INCOMING_MESSAGE_QUEUE_H*/
diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl
new file mode 100644
index 00000000000..8d270b9c479
--- /dev/null
+++ b/TAO/tao/Incoming_Message_Queue.inl
@@ -0,0 +1,78 @@
+// -*- C++ -*-
+//$Id$
+ACE_INLINE CORBA::ULong
+TAO_Incoming_Message_Queue::queue_length (void)
+{
+ return this->size_;
+}
+
+ACE_INLINE int
+TAO_Incoming_Message_Queue::is_tail_complete (void)
+{
+ // If the size is 0 return -1
+ if (this->size_ == 0)
+ return -1;
+
+ if (this->size_ &&
+ this->queued_data_->missing_data_ == 0)
+ return 1;
+
+ return 0;
+}
+
+ACE_INLINE int
+TAO_Incoming_Message_Queue::is_head_complete (void)
+{
+ if (this->size_ == 0)
+ return -1;
+
+ if (this->size_ &&
+ this->queued_data_->next_->missing_data_ == 0)
+ return 1;
+
+ return 0;
+}
+
+ACE_INLINE size_t
+TAO_Incoming_Message_Queue::missing_data_tail (void) const
+{
+ if (this->size_ != 0)
+ return this->queued_data_->missing_data_;
+
+ return 0;
+}
+
+
+
+ACE_INLINE TAO_Queued_Data *
+TAO_Incoming_Message_Queue::get_node (void)
+{
+ return TAO_Queued_Data::get_queued_data ();
+}
+
+
+/************************************************************************/
+// Methods for TAO_Queued_Data
+/************************************************************************/
+/*static*/
+ACE_INLINE TAO_Queued_Data *
+TAO_Queued_Data::get_queued_data (void)
+{
+ // @@TODO: Use the global pool for allocationg...
+ TAO_Queued_Data *qd = 0;
+ ACE_NEW_RETURN (qd,
+ TAO_Queued_Data,
+ 0);
+
+ return qd;
+}
+
+/*static*/
+ACE_INLINE void
+TAO_Queued_Data::release (TAO_Queued_Data *qd)
+{
+ ACE_Message_Block::release (qd->msg_block_);
+
+ // @@TODO: Use the global pool for releasing..
+ delete qd;
+}
diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp
index 5f975f1e7b5..c50a3166851 100644
--- a/TAO/tao/Invocation.cpp
+++ b/TAO/tao/Invocation.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "Invocation.h"
#include "Principal.h"
#include "Stub.h"
@@ -29,8 +30,10 @@
# include "Invocation.i"
#endif /* ! __ACE_INLINE__ */
+
ACE_RCSID(tao, Invocation, "$Id$")
+
#if defined (ACE_ENABLE_TIMEPROBES)
static const char *TAO_Invocation_Timeprobe_Description[] =
diff --git a/TAO/tao/Invocation_Endpoint_Selectors.cpp b/TAO/tao/Invocation_Endpoint_Selectors.cpp
index fafe05a84aa..1e9a83da328 100644
--- a/TAO/tao/Invocation_Endpoint_Selectors.cpp
+++ b/TAO/tao/Invocation_Endpoint_Selectors.cpp
@@ -14,6 +14,7 @@
ACE_RCSID(tao, Invocation_Endpoint_Selectors, "$Id$")
+
TAO_Invocation_Endpoint_Selector::~TAO_Invocation_Endpoint_Selector (void)
{
}
diff --git a/TAO/tao/LocalObject.cpp b/TAO/tao/LocalObject.cpp
index e8952ac76b2..2070ba6e8d1 100644
--- a/TAO/tao/LocalObject.cpp
+++ b/TAO/tao/LocalObject.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "LocalObject.h"
#if !defined (__ACE_INLINE__)
@@ -14,6 +15,7 @@ ACE_RCSID (tao,
LocalObject,
"$Id$")
+
CORBA_LocalObject::~CORBA_LocalObject (void)
{
}
diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile
index 980e6c4d87e..e3362cfd566 100644
--- a/TAO/tao/Makefile
+++ b/TAO/tao/Makefile
@@ -1,6 +1,6 @@
#----------------------------------------------------------------------------
-
+#
# $Id$
#
# Makefile for TAO
@@ -84,6 +84,8 @@ PUB_HDRS = \
PLUGGABLE_PROTOCOLS_FILES = \
Pluggable \
Transport \
+ Incoming_Message_Queue \
+ Resume_Handle \
Profile \
Endpoint \
Connector_Registry \
@@ -106,8 +108,6 @@ PLUGGABLE_MESSAGING_FILES = \
Pluggable_Messaging \
Pluggable_Messaging_Utils \
GIOP_Message_Base \
- GIOP_Message_Lite \
- GIOP_Message_Reactive_Handler \
GIOP_Message_Generator_Parser \
GIOP_Message_Generator_Parser_10 \
GIOP_Message_Generator_Parser_11 \
diff --git a/TAO/tao/MessagingC.h b/TAO/tao/MessagingC.h
index 8795ab51435..7b35f808d59 100644
--- a/TAO/tao/MessagingC.h
+++ b/TAO/tao/MessagingC.h
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp
index ee5f9647379..1bd476a279e 100644
--- a/TAO/tao/Muxed_TMS.cpp
+++ b/TAO/tao/Muxed_TMS.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "tao/Muxed_TMS.h"
#include "tao/Reply_Dispatcher.h"
#include "tao/GIOP_Message_Version.h"
@@ -9,6 +10,7 @@
ACE_RCSID(tao, Muxed_TMS, "$Id$")
+
TAO_Muxed_TMS::TAO_Muxed_TMS (TAO_Transport *transport)
: TAO_Transport_Mux_Strategy (transport),
request_id_generator_ (0),
@@ -104,28 +106,6 @@ TAO_Muxed_TMS::dispatch_reply (TAO_Pluggable_Reply_Params &params)
// sending the request.
}
-/*TAO_GIOP_Message_State *
-TAO_Muxed_TMS::get_message_state (void)
-{
- if (this->message_state_ == 0)
- {
- // Create the next message state.
- ACE_NEW_RETURN (this->message_state_,
- TAO_GIOP_Message_State
- (this->transport_->orb_core ()),
- 0);
- }
-
- return this->message_state_;
-}
-
-void
-TAO_Muxed_TMS::destroy_message_state (TAO_GIOP_Message_State *)
-{
- delete this->message_state_;
- this->message_state_ = 0;
-}*/
-
int
TAO_Muxed_TMS::idle_after_send (void)
{
diff --git a/TAO/tao/Muxed_TMS.h b/TAO/tao/Muxed_TMS.h
index 0fdf9fd71f8..0824dc1d72f 100644
--- a/TAO/tao/Muxed_TMS.h
+++ b/TAO/tao/Muxed_TMS.h
@@ -59,10 +59,6 @@ public:
virtual int dispatch_reply (TAO_Pluggable_Reply_Params &params);
- // @@ Commented for the time being, let the commented line stay for
- // sometime - Bala
- // virtual TAO_GIOP_Message_State *get_message_state (void);
- // virtual void destroy_message_state (TAO_GIOP_Message_State *);
virtual int idle_after_send (void);
virtual int idle_after_reply (void);
virtual void connection_closed (void);
diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp
index f54ba3a1d1f..0c0efbaf6db 100644
--- a/TAO/tao/ORB.cpp
+++ b/TAO/tao/ORB.cpp
@@ -1,6 +1,7 @@
// $Id$
+
#include "ORB.h"
#include "ORB_Table.h"
#include "Connector_Registry.h"
@@ -70,6 +71,7 @@ using std::set_unexpected;
ACE_RCSID(tao, ORB, "$Id$")
+
static const char ior_prefix [] = "IOR:";
// = Static initialization.
@@ -1755,6 +1757,7 @@ CORBA_ORB::object_to_string (CORBA::Object_ptr obj,
CORBA::COMPLETED_NO),
0);
+
// Application writer controls what kind of objref strings they get,
// maybe along with other things, by how they initialize the ORB.
@@ -1981,8 +1984,11 @@ CORBA_ORB::ior_string_to_object (const char *str,
int byte_order = *(mb.rd_ptr ());
mb.rd_ptr (1);
mb.wr_ptr (len);
- TAO_InputCDR stream (&mb, byte_order, TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR, this->orb_core_);
+ TAO_InputCDR stream (&mb,
+ byte_order,
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR,
+ this->orb_core_);
CORBA::Object_ptr objref = CORBA::Object::_nil ();
stream >> objref;
diff --git a/TAO/tao/Object_Ref_Table.cpp b/TAO/tao/Object_Ref_Table.cpp
index c883296595d..f5892459696 100644
--- a/TAO/tao/Object_Ref_Table.cpp
+++ b/TAO/tao/Object_Ref_Table.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "Object_Ref_Table.h"
#include "Object.h"
#include "Exception.h"
@@ -13,6 +14,7 @@ ACE_RCSID (tao,
Object_Ref_Table,
"$Id$")
+
// ****************************************************************
TAO_Object_Ref_Table::TAO_Object_Ref_Table (void)
diff --git a/TAO/tao/Pluggable_Messaging.cpp b/TAO/tao/Pluggable_Messaging.cpp
index c7ec77f596d..65c2c20a01c 100644
--- a/TAO/tao/Pluggable_Messaging.cpp
+++ b/TAO/tao/Pluggable_Messaging.cpp
@@ -14,9 +14,3 @@ TAO_Pluggable_Messaging::~TAO_Pluggable_Messaging (void)
{
}
-
-int
-TAO_Pluggable_Messaging::more_messages (void)
-{
- ACE_NOTSUP_RETURN (-1);
-}
diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h
index eb8fda573b0..15be5628acf 100644
--- a/TAO/tao/Pluggable_Messaging.h
+++ b/TAO/tao/Pluggable_Messaging.h
@@ -31,6 +31,7 @@ class TAO_Transport;
class TAO_Operation_Details;
class TAO_Target_Specification;
class TAO_OutputCDR;
+class TAO_Queued_Data;
// @@ The more I think I about this class, I feel that this class need
// not be a ABC as it is now. Instead we have these options
@@ -108,27 +109,43 @@ public:
/// general.
virtual int format_message (TAO_OutputCDR &cdr) = 0;
- /// Get the message type that was received.
- virtual TAO_Pluggable_Message_Type message_type (void) = 0;
-
-
/// Do any initialisations that may be needed.
virtual void init (CORBA::Octet major,
CORBA::Octet minor) = 0;
- /// Reset teh messaging object
- virtual void reset (int reset_flag = 1) = 0;
- // Reset the messaging object
+ /// Parse the incoming messages..
+ virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0;
+
+ /// Calculate the amount of data that is missing in the <incoming>
+ /// message block.
+ virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0;
+
+ /// Get the details of the message parsed through the <qd>.
+ virtual void get_message_data (TAO_Queued_Data *qd) = 0;
+
+ /* Extract the details of the next message from the <incoming>
+ * through <qd>. Returns 1 if there are more messages and returns a
+ * 0 if there are no more messages in <incoming>.
+ */
+ virtual int extract_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data *&qd) = 0;
+
+ /// Check whether the node <qd> needs consolidation from <incoming>
+ virtual int consolidate_node (TAO_Queued_Data *qd,
+ ACE_Message_Block &incoming) = 0;
/// Parse the request message, make an upcall and send the reply back
/// to the "request initiator"
virtual int process_request_message (TAO_Transport *transport,
- TAO_ORB_Core *orb_core) = 0;
+ TAO_Queued_Data *qd) = 0;
+
/// Parse the reply message that we received and return the reply
/// information though <reply_info>
virtual int process_reply_message (
- TAO_Pluggable_Reply_Params &reply_info) = 0;
+ TAO_Pluggable_Reply_Params &reply_info,
+ TAO_Queued_Data *qd) = 0;
+
/// Generate a reply message with the exception <ex>.
virtual int generate_exception_reply (
@@ -140,8 +157,8 @@ public:
/// request/response?
virtual int is_ready_for_bidirectional (void) = 0;
- /// Are there any more messages that needs processing?
- virtual int more_messages (void);
+ /// Reset the messaging the object
+ virtual void reset (void) = 0;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Pluggable_Messaging_Utils.cpp b/TAO/tao/Pluggable_Messaging_Utils.cpp
index 8c6420c3694..5b0ea8553b2 100644
--- a/TAO/tao/Pluggable_Messaging_Utils.cpp
+++ b/TAO/tao/Pluggable_Messaging_Utils.cpp
@@ -1,4 +1,5 @@
//$Id$
+
#include "tao/Pluggable_Messaging_Utils.h"
#include "tao/ORB_Core.h"
@@ -8,12 +9,11 @@
ACE_RCSID(tao, Pluggable_Messaging_Utils, "$Id$")
+
TAO_Pluggable_Reply_Params::TAO_Pluggable_Reply_Params (
TAO_ORB_Core *orb_core
)
- : input_cdr_ (orb_core->create_input_cdr_data_block (
- ACE_CDR::DEFAULT_BUFSIZE
- ),
+ : input_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE),
0,
TAO_ENCAP_BYTE_ORDER,
TAO_DEF_GIOP_MAJOR,
diff --git a/TAO/tao/Pluggable_Messaging_Utils.h b/TAO/tao/Pluggable_Messaging_Utils.h
index 63f0d756e57..c5a2afae2f3 100644
--- a/TAO/tao/Pluggable_Messaging_Utils.h
+++ b/TAO/tao/Pluggable_Messaging_Utils.h
@@ -92,13 +92,17 @@ protected:
* This represents a set of data that would be received by the
* connector from the acceptor.
*/
-class TAO_Export TAO_Pluggable_Reply_Params
+class TAO_Export TAO_Pluggable_Reply_Params
: public TAO_Pluggable_Reply_Params_Base
{
public:
/// Constructor.
TAO_Pluggable_Reply_Params (TAO_ORB_Core *orb_core);
+ /* @todo: There is a way out clear this off from stack. Need to look
+ into that after 1.2
+ */
+
/// The stream with the non-demarshaled reply. This stream will be
/// passed up to the stubs to demarshal the parameter values.
TAO_InputCDR input_cdr_;
diff --git a/TAO/tao/PolicyC.cpp b/TAO/tao/PolicyC.cpp
index 6eb368913ae..84cec53c08d 100644
--- a/TAO/tao/PolicyC.cpp
+++ b/TAO/tao/PolicyC.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/PolicyC.i b/TAO/tao/PolicyC.i
index ceb80909d8b..1b2b0c267c7 100644
--- a/TAO/tao/PolicyC.i
+++ b/TAO/tao/PolicyC.i
@@ -2,6 +2,7 @@
//
// $Id$
+
// **** Code generated by the The ACE ORB (TAO) IDL Compiler ****
// TAO and the TAO IDL Compiler have been developed by:
// Center for Distributed Object Computing
diff --git a/TAO/tao/PolicyFactory_Registry.cpp b/TAO/tao/PolicyFactory_Registry.cpp
index 29910a7f3c7..814f6e0d1c4 100644
--- a/TAO/tao/PolicyFactory_Registry.cpp
+++ b/TAO/tao/PolicyFactory_Registry.cpp
@@ -2,6 +2,7 @@
//
// $Id$
+
#include "PolicyFactory_Registry.h"
ACE_RCSID(tao, PolicyFactory_Registry, "$Id$")
diff --git a/TAO/tao/PortableInterceptor.pidl b/TAO/tao/PortableInterceptor.pidl
index e8421db4f8e..b2e4e8980a4 100644
--- a/TAO/tao/PortableInterceptor.pidl
+++ b/TAO/tao/PortableInterceptor.pidl
@@ -2,6 +2,7 @@
//
// $Id$
+
// ================================================================
//
// This file was used to generate the code in PortableInterceptorC.*
diff --git a/TAO/tao/Profile.cpp b/TAO/tao/Profile.cpp
index b7b4f9b17a2..a6b816f6855 100644
--- a/TAO/tao/Profile.cpp
+++ b/TAO/tao/Profile.cpp
@@ -1,5 +1,6 @@
// $Id$
+
#include "Profile.h"
#include "Object_KeyC.h"
@@ -13,6 +14,7 @@
ACE_RCSID(tao, Profile, "$Id$")
+
// ****************************************************************
TAO_Profile::~TAO_Profile (void)
@@ -261,9 +263,9 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV)
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Cannot add ")
- ACE_TEXT ("IOP::TaggedComponent to profile.\n")
- ACE_TEXT ("(%P|%t) Standard profile components ")
- ACE_TEXT ("have been disabled or URL style IORs\n")
+ ACE_TEXT ("IOP::TaggedComponent to profile.\n")
+ ACE_TEXT ("(%P|%t) Standard profile components ")
+ ACE_TEXT ("have been disabled or URL style IORs\n")
ACE_TEXT ("(%P|%t) are in use. Try ")
ACE_TEXT ("\"-ORBStdProfileComponents 1\" and/or\n")
ACE_TEXT ("(%P|%t) \"-ORBObjRefStyle IOR\".\n")));
@@ -276,8 +278,8 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW (CORBA::BAD_PARAM (
CORBA_SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
- EINVAL),
- CORBA::COMPLETED_NO));
+ EINVAL),
+ CORBA::COMPLETED_NO));
}
}
@@ -292,8 +294,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV)
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Cannot add ")
- ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0")
- ACE_TEXT ("IOR profile.\n")
+ ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0")
+ ACE_TEXT ("IOR profile.\n")
ACE_TEXT ("(%P|%t) Try using a GIOP 1.1 or ")
ACE_TEXT ("greater endpoint.\n")));
@@ -305,8 +307,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV)
ACE_THROW (CORBA::BAD_PARAM (
CORBA_SystemException::_tao_minor_code (
TAO_DEFAULT_MINOR_CODE,
- EINVAL),
- CORBA::COMPLETED_NO));
+ EINVAL),
+ CORBA::COMPLETED_NO));
}
}
diff --git a/TAO/tao/Resume_Handle.cpp b/TAO/tao/Resume_Handle.cpp
new file mode 100644
index 00000000000..af94eefa37e
--- /dev/null
+++ b/TAO/tao/Resume_Handle.cpp
@@ -0,0 +1,24 @@
+#include "Resume_Handle.h"
+#include "ORB_Core.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "Resume_Handle.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(tao, Resume_Handle, "$Id$")
+
+void
+TAO_Resume_Handle::resume_handle (void)
+{
+ // If we have a complete message, just resume the handler
+ // Resume the handler.
+ if (this->orb_core_ &&
+ this->orb_core_->reactor ()->resumable_handler () &&
+ this->flag_ == TAO_HANDLE_RESUMABLE &&
+ this->handle_ != ACE_INVALID_HANDLE)
+ this->orb_core_->reactor ()->resume_handler (this->handle_);
+
+ // Set the flag, so that we dont resume again..
+ this->flag_ = TAO_HANDLE_ALREADY_RESUMED;
+}
diff --git a/TAO/tao/Resume_Handle.h b/TAO/tao/Resume_Handle.h
new file mode 100644
index 00000000000..36449fad030
--- /dev/null
+++ b/TAO/tao/Resume_Handle.h
@@ -0,0 +1,90 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Resume_Handle.h
+ *
+ * $Id$
+ *
+ * @author Balachandran Natarajan <bala@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef TAO_RESUME_HANDLE_H
+#define TAO_RESUME_HANDLE_H
+#include "ace/pre.h"
+
+#include "TAO_Export.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+class TAO_ORB_Core;
+
+
+/**
+ * @class TAO_Resume_Handle
+ *
+ * @brief A utility class that helps in resuming handlers if TAO uses
+ * a TP Reactor from ACE.
+ *
+ * Please read the documentation in the bugzilla #575 in the bugzilla
+ * database what we mean by handler resumption.
+ *
+ * When TAO uses a TP reactor, it takes care of resuming the handler
+ * once it makes sure that it has read the whole message out of the
+ * socket. During the process of reading the transport object would
+ * have to deal with errors in 'read' from the socket, or errors in
+ * the messages that has been received. Instead of calling
+ * resume_handler () on the reactor at every point in the code, we
+ * use this utility class to take care of the resumption.
+ */
+
+class TAO_Export TAO_Resume_Handle
+{
+
+public:
+
+ /// Ctor.
+ TAO_Resume_Handle (TAO_ORB_Core *orb_core = 0,
+ ACE_HANDLE h = ACE_INVALID_HANDLE);
+ /// Dtor
+ ~TAO_Resume_Handle (void);
+
+ enum TAO_Handle_Resume_Flag
+ {
+ TAO_HANDLE_RESUMABLE = 0,
+ TAO_HANDLE_ALREADY_RESUMED,
+ TAO_HANDLE_LEAVE_SUSPENDED
+ };
+
+ /// Allow the users of this class to change the underlying flag.
+ void set_flag (TAO_Handle_Resume_Flag fl);
+
+ /// Equal to operator..
+ TAO_Resume_Handle &operator= (const TAO_Resume_Handle &rhs);
+
+ /// Resume the handle in the reactor only if the ORB uses a TP
+ /// reactor. Else we dont resume the handle.
+ void resume_handle (void);
+
+private:
+
+ /// Our ORB Core.
+ TAO_ORB_Core *orb_core_;
+
+ /// The actual handle that needs resumption..
+ ACE_HANDLE handle_;
+
+ /// Th flag for indicating whether the handle has been resumed or
+ /// not. A value of '0' indicates that the handle needs resumption.
+ TAO_Handle_Resume_Flag flag_;
+};
+
+#if defined (__ACE_INLINE__)
+# include "Resume_Handle.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/pre.h"
+#endif /*TAO_RESUME_HANDLE*/
diff --git a/TAO/tao/Resume_Handle.inl b/TAO/tao/Resume_Handle.inl
new file mode 100644
index 00000000000..38aae6740f1
--- /dev/null
+++ b/TAO/tao/Resume_Handle.inl
@@ -0,0 +1,38 @@
+// -*- C++ -*-
+//$Id$
+
+ACE_INLINE
+TAO_Resume_Handle::TAO_Resume_Handle (TAO_ORB_Core *orb_core,
+ ACE_HANDLE h)
+ : orb_core_ (orb_core),
+ handle_ (h),
+ flag_ (TAO_HANDLE_RESUMABLE)
+{
+}
+
+ACE_INLINE
+TAO_Resume_Handle::~TAO_Resume_Handle (void)
+{
+ if (this->flag_ == TAO_HANDLE_RESUMABLE)
+ this->resume_handle ();
+
+ this->orb_core_ = 0;
+ this->handle_ = ACE_INVALID_HANDLE;
+}
+
+
+ACE_INLINE void
+TAO_Resume_Handle::set_flag (TAO_Handle_Resume_Flag fl)
+{
+ this->flag_ = fl;
+}
+
+ACE_INLINE TAO_Resume_Handle &
+TAO_Resume_Handle::operator= (const TAO_Resume_Handle &rhs)
+{
+ this->orb_core_ = rhs.orb_core_;
+ this->handle_ = rhs.handle_;
+ this->flag_ = rhs.flag_;
+
+ return *this;
+}
diff --git a/TAO/tao/Strategies/DIOP_Connection_Handler.cpp b/TAO/tao/Strategies/DIOP_Connection_Handler.cpp
index 9b95deeea56..beb08a7679b 100644
--- a/TAO/tao/Strategies/DIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/DIOP_Connection_Handler.cpp
@@ -14,6 +14,7 @@
#include "tao/Server_Strategy_Factory.h"
#include "tao/Transport_Cache_Manager.h"
#include "tao/Base_Transport_Property.h"
+#include "tao/Resume_Handle.h"
#include "DIOP_Transport.h"
#include "DIOP_Endpoint.h"
@@ -259,6 +260,12 @@ TAO_DIOP_Connection_Handler::fetch_handle (void)
return this->get_handle ();
}
+int
+TAO_DIOP_Connection_Handler::resume_handler (void)
+{
+ return TAO_RESUMES_CONNECTION_HANDLER;
+}
+
int
TAO_DIOP_Connection_Handler::add_transport_to_cache (void)
@@ -322,33 +329,28 @@ TAO_DIOP_Connection_Handler::process_listen_point_list (
*/
int
-TAO_DIOP_Connection_Handler::handle_input (ACE_HANDLE h)
-{
- return this->handle_input_i (h);
-}
-
-
-int
-TAO_DIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
- ACE_Time_Value *max_wait_time)
+TAO_DIOP_Connection_Handler::handle_input (ACE_HANDLE)
{
+ // Increase the reference count on the upcall that have passed us.
this->pending_upcalls_++;
- // Call the transport read the message
- int result = this->transport ()->read_process_message (max_wait_time);
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ this->fetch_handle ());
+
+ int retval = this->transport ()->handle_input_i (resume_handle);
// Now the message has been read
- if (result == -1 && TAO_debug_level > 0)
+ if (retval == -1 && TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("DIOP_Connection_Handler::handle_input_i \n")));
+ ACE_TEXT ("DIOP_Connection_Handler::handle_input \n")));
}
// The upcall is done. Bump down the reference count
if (--this->pending_upcalls_ <= 0)
- result = -1;
+ retval = -1;
// @@ Michael:
// We always return 0, as we do not have any
@@ -356,6 +358,8 @@ TAO_DIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
return 0;
}
+
+
// @@ Frank: From DIOP_Connect.cpp
int
TAO_DIOP_Connection_Handler::handle_cleanup (void)
diff --git a/TAO/tao/Strategies/DIOP_Connection_Handler.h b/TAO/tao/Strategies/DIOP_Connection_Handler.h
index b0291796f43..bb8fc50bbdf 100644
--- a/TAO/tao/Strategies/DIOP_Connection_Handler.h
+++ b/TAO/tao/Strategies/DIOP_Connection_Handler.h
@@ -117,6 +117,9 @@ public:
/// Return the underlying handle
virtual ACE_HANDLE fetch_handle (void);
+ /// Event handler overload..
+ virtual int resume_handler (void);
+
/// Add ourselves to Cache.
int add_transport_to_cache (void);
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
index 28bfc2c927a..1e74bc6f5f1 100644
--- a/TAO/tao/Strategies/DIOP_Transport.cpp
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -18,8 +18,9 @@
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
+#include "tao/Resume_Handle.h"
#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
+// #include "tao/GIOP_Message_Lite.h"
#if !defined (__ACE_INLINE__)
# include "DIOP_Transport.i"
@@ -29,7 +30,7 @@ ACE_RCSID (tao, DIOP_Transport, "$Id$")
TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
+ CORBA::Boolean /*flag*/)
: TAO_Transport (TAO_TAG_UDP_PROFILE,
orb_core)
, connection_handler_ (handler)
@@ -37,14 +38,14 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
{
// @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that
// we read the whole UDP packet on a single read.
- if (flag)
+ /* if (flag)
{
// Use the lite version of the protocol
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Lite (orb_core,
ACE_MAX_DGRAM_SIZE));
- }
- else
+ }
+ else*/
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
@@ -109,6 +110,30 @@ TAO_DIOP_Transport::recv_i (char *buf,
errno));
}
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv_i () \n")));
+ }
+
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
+ {
+ return -1;
+ }
+
// Remember the from addr to eventually use it as remote
// addr for the reply.
this->connection_handler_->addr (from_addr);
@@ -116,42 +141,82 @@ TAO_DIOP_Transport::recv_i (char *buf,
return n;
}
-
int
-TAO_DIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
- int block)
+TAO_DIOP_Transport::handle_input_i (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time,
+ int /*block*/)
{
- // Read the message of the socket
- int result = this->messaging_object_->read_message (this,
- block,
- max_wait_time);
-
- if (result == -1)
+ // If there are no messages then we can go ahead to read from the
+ // handle for further reading..
+
+ // The buffer on the stack which will be used to hold the input
+ // messages
+ char buf [ACE_MAX_DGRAM_SIZE];
+
+#if defined (ACE_HAS_PURIFY)
+ (void) ACE_OS::memset (buf,
+ '\0',
+ sizeof buf);
+#endif /* ACE_HAS_PURIFY */
+
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->message_block_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_dblock_allocator ());
+
+ // Create a message block
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_msgblock_allocator ());
+
+
+ // Align the message block
+ ACE_CDR::mb_align (&message_block);
+
+
+ // Read the message into the message block that we have created on
+ // the stack.
+ ssize_t n = this->recv (message_block.rd_ptr (),
+ message_block.space (),
+ max_wait_time);
+
+ // If there is an error return to the reactor..
+ if (n <= 0)
{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("DIOP_Transport::read_process_message, failure in read_message ()")));
+ if (n == -1)
+ this->tms_->connection_closed ();
- this->tms_->connection_closed ();
- return -1;
+ return n;
}
- if (result < 2)
- return result;
- // Now we know that we have been able to read the complete message
- // here.. We loop here to see whether we have read more than one
- // message in our read.
- // Set the result state
- result = 1;
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
- // See we use the reactor semantics again
- while (result > 0)
- {
- result = this->process_message ();
- }
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->parse_incoming_messages (message_block) == -1)
+ return -1;
+
+ // NOTE: We are not performing any queueing nor any checking for
+ // missing data. We are assuming that ALL the data would be got in a
+ // single read.
+
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block);
+
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
+
+ // Resume before starting to process the request..
+ rh.resume_handle ();
+
+ // Process the message
+ return this->process_parsed_messages (&qd);
- return result;
}
@@ -275,130 +340,7 @@ TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
}
*/
-int
-TAO_DIOP_Transport::process_message (void)
-{
- // Check whether we have messages for processing
- int retval =
- this->messaging_object_->more_messages ();
-
- if (retval <= 0)
- return retval;
- // Get the <message_type> that we have received
- TAO_Pluggable_Message_Type t =
- this->messaging_object_->message_type ();
-
-
- int result = 0;
- if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("Close Connection Message recd \n")));
-
- this->tms_->connection_closed ();
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
- {
- if (this->messaging_object_->process_request_message (this,
- this->orb_core ()) == -1)
- return -1;
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- TAO_Pluggable_Reply_Params params (this->orb_core ());
- if (this->messaging_object_->process_reply_message (params) == -1)
- {
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("DIOP_Transport::process_message, process_reply_message ()")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
-
- result =
- this->tms_->dispatch_reply (params);
-
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
-
-
- if (result == -1)
- {
- // Something really critical happened, we will forget about
- // every reply on this connection.
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) : DIOP_Transport::")
- ACE_TEXT ("process_message - ")
- ACE_TEXT ("dispatch reply failed\n")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- if (result == 0)
- {
- this->messaging_object_->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- {
- return -1;
- }
-
- return 1;
-}
// @@ Frank: Hopefully DIOP doesn't need this
/*
diff --git a/TAO/tao/Strategies/DIOP_Transport.h b/TAO/tao/Strategies/DIOP_Transport.h
index e8732f433cf..e70a0decc11 100644
--- a/TAO/tao/Strategies/DIOP_Transport.h
+++ b/TAO/tao/Strategies/DIOP_Transport.h
@@ -62,6 +62,10 @@ public:
/// Default destructor.
~TAO_DIOP_Transport (void);
+ /// Look for the documentation in Transport.h.
+ virtual int handle_input_i (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time = 0,
+ int block = 0);
protected:
/** @name Overridden Template Methods
*
@@ -83,11 +87,6 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
- /// Read and process the message from the connection. The processing
- /// of the message is done by delegating the work to the underlying
- /// messaging object
- virtual int read_process_message (ACE_Time_Value *max_time_value = 0,
- int block =0);
virtual int register_handler_i (void);
@@ -114,9 +113,6 @@ public:
CORBA::Octet minor);
private:
- /// Process the message that we have read
- int process_message (void);
-
// @@ Frank : Not needed
/*
/// Set the Bidirectional context info in the service context list
diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp
index 84f5a560fc2..1c4f044eebd 100644
--- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp
+++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp
@@ -8,10 +8,9 @@
ACE_RCSID (Strategies, GIOP_Message_NonReactive_Base, "$Id$")
-TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core,
- size_t buf_size)
- : TAO_GIOP_Message_Base (orb_core, buf_size),
- message_handler_ (orb_core, this, buf_size)
+TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core)
+
+ : TAO_GIOP_Message_Base (orb_core)
{
}
@@ -19,14 +18,18 @@ TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Co
int
TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport,
+ ACE_Message_Block &block,
int /*block*/,
ACE_Time_Value *max_wait_time)
{
// Call the handler to read and do a simple parse of the header of
// the message.
int retval =
- this->message_handler_.read_parse_message (transport,
- max_wait_time);
+ this->read_data (transport,
+ max_wait_time);
+
+ // Before we do this let us reset the
+ char *buf = this->input_cdr_.rd_ptr ();
// Error in the message that was received
@@ -61,6 +64,19 @@ TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport,
return 2;
}
+
+size_t
+TAO_GIOP_Message_NonReactive_Base::read_data (TAO_Transport *transport,
+ ACE_Time_Value *time)
+{
+
+ transport->recv (buf,
+ n,
+ max_wait_time);
+
+}
+
+
TAO_Pluggable_Message_Type
TAO_GIOP_Message_NonReactive_Base::message_type (void)
{
diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h
index d6e8b434ecd..c7ddb348117 100644
--- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h
+++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h
@@ -28,21 +28,23 @@ class TAO_Pluggable_Reply_Params;
/**
* @class TAO_GIOP_Message_NonReactive_Base
*
- * @brief Uses the NonReactive handler class for reading messages.
+ * @brief Uses the NonReactive mechanism to read and process
+ * messages.
*
- * This class uses the TAO_GIOP_Message_NonReactive_Handler class to
- * read and parse messages. This class derives from
- * TAO_GIOP_Message_Base. It just redirects most of the functions to
- * the base class but just acts as a sort of place holder for the
- * NonReactive handler class.
+ * Some protocols based on shared memory cannot make use of the
+ * reactor as other protocols based on TCP/IP. This class is a relief
+ * for such protocols. This effectively does the following
+ * - reads the GIOP header out of the transport
+ * - processes the header to determine the length of the message
+ * and other details.
+ * - reads the body of the message from the transport
+ * - passes the data to the base class for making the upcall.
*/
class TAO_Strategies_Export TAO_GIOP_Message_NonReactive_Base :public TAO_GIOP_Message_Base
{
public:
- friend class TAO_GIOP_Message_NonReactive_Handler;
-
/// Constructor
TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core,
size_t cdr_size = ACE_CDR::DEFAULT_BUFSIZE);
@@ -82,9 +84,8 @@ public:
private:
- /// Thr message handler object that does reading and parsing of the
- /// incoming messages
- TAO_GIOP_Message_NonReactive_Handler message_handler_;
+ /// The input cdr stream in which the incoming data is stored.
+ TAO_InputCDR input_cdr_;
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h
index 02b93da7d47..a8d9642a1b1 100644
--- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h
+++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h
@@ -77,8 +77,7 @@ private:
/// Our Message base
TAO_GIOP_Message_NonReactive_Base *mesg_base_;
- /// The input cdr stream in which the incoming data is stored.
- TAO_InputCDR input_cdr_;
+
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/tao/Strategies/Makefile b/TAO/tao/Strategies/Makefile
index 4d2263b56e8..890b22d056a 100644
--- a/TAO/tao/Strategies/Makefile
+++ b/TAO/tao/Strategies/Makefile
@@ -1,4 +1,5 @@
#----------------------------------------------------------------------------
+#
# $Id$
#
# Makefile for TAO_Strategies
@@ -45,8 +46,6 @@ CPP_SRCS += \
SHMIOP_Acceptor \
SHMIOP_Connection_Handler \
SHMIOP_Endpoint \
- GIOP_Message_NonReactive_Base \
- GIOP_Message_NonReactive_Handler \
TAO_Strategies_Internal \
uiop_endpoints \
advanced_resource \
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
index fd7fb9db6e6..5e0b8361a75 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
@@ -10,12 +10,11 @@
#include "tao/ORB.h"
#include "tao/CDR.h"
#include "tao/Messaging_Policy_i.h"
-#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
#include "tao/Server_Strategy_Factory.h"
#include "tao/Base_Transport_Property.h"
#include "tao/Transport_Cache_Manager.h"
#include "SHMIOP_Endpoint.h"
+#include "tao/Resume_Handle.h"
#if !defined (__ACE_INLINE__)
# include "SHMIOP_Connection_Handler.inl"
@@ -220,8 +219,18 @@ TAO_SHMIOP_Connection_Handler::fetch_handle (void)
}
int
+TAO_SHMIOP_Connection_Handler::resume_handler (void)
+{
+ return TAO_RESUMES_CONNECTION_HANDLER;
+}
+
+int
TAO_SHMIOP_Connection_Handler::handle_output (ACE_HANDLE)
{
+ // Instantiate the resume handle here.. This will automatically
+ // resume the handle once data is written..
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ this->fetch_handle ());
return this->transport ()->handle_output ();
}
@@ -249,44 +258,28 @@ TAO_SHMIOP_Connection_Handler::add_transport_to_cache (void)
int
-TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE h)
-{
- return this->handle_input_i (h);
-}
-
-
-int
-TAO_SHMIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
- ACE_Time_Value *max_wait_time)
+TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE)
{
+ // Increase the reference count on the upcall that have passed us.
this->pending_upcalls_++;
- // Call the transport read the message
- int result = this->transport ()->read_process_message (max_wait_time);
-
- // Now the message has been read
- if (result == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("SHMIOP_Connection_Handler::read_message \n")));
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ this->fetch_handle ());
- }
+ int retval = this->transport ()->handle_input_i (resume_handle);
// The upcall is done. Bump down the reference count
if (--this->pending_upcalls_ <= 0)
- result = -1;
-
- if (result == 0 || result == -1)
- {
- return result;
- }
+ retval = -1;
- return 0;
+ return retval;
}
+
+
+
// ****************************************************************
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h
index cf9c2fe3549..6cbf34e5457 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h
@@ -90,6 +90,9 @@ public:
/// Documented in ACE_Event_Handler
virtual int handle_output (ACE_HANDLE);
+ /// Overload for resuming handlers..
+ virtual int resume_handler (void);
+
/// Add ourselves to Cache.
int add_transport_to_cache (void);
@@ -104,8 +107,6 @@ protected:
/// ensure that server threads eventually exit.
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0);
private:
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp
index 81c803e0a41..0b333e0c566 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp
@@ -15,9 +15,9 @@
#include "tao/Stub.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
+#include "tao/Resume_Handle.h"
+#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
-#include "GIOP_Message_NonReactive_Base.h"
#if !defined (__ACE_INLINE__)
# include "SHMIOP_Transport.i"
@@ -27,12 +27,13 @@ ACE_RCSID (Strategies, SHMIOP_Transport, "$Id$")
TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
+ CORBA::Boolean /*flag*/)
: TAO_Transport (TAO_TAG_SHMEM_PROFILE,
orb_core),
connection_handler_ (handler),
messaging_object_ (0)
{
+#if 0
if (flag)
{
// Use the lite version of the protocol
@@ -40,10 +41,11 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl
TAO_GIOP_Message_Lite (orb_core));
}
else
+#endif /*#if 0 */
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_NonReactive_Base (orb_core));
+ TAO_GIOP_Message_Base (orb_core));
}
}
@@ -91,42 +93,100 @@ TAO_SHMIOP_Transport::recv_i (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
- return this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
+ ssize_t n = 0;
+
+ int read_break = 0;
+
+ while (!read_break)
+ {
+ n = this->connection_handler_->peer ().recv (buf,
+ len,
+ max_wait_time);
+
+ // If we get a EWOULBLOCK we try to read again.
+ if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+ {
+ n = 0;
+ continue;
+ }
+
+ // If there is anything else we just drop out of the loop.
+ read_break = 1;
+ }
+
+ if (n == 0 || n == -1)
+ {
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv_i () \n")));
+ }
+ }
+
+ return n;
+
}
+
int
-TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
- int block)
+TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
{
- // Read the message of the socket
- int result = this->messaging_object_->read_message (this,
- block,
- max_wait_time);
-
- if (result == -1)
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.length ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
+
+ // .. do a read on the socket again.
+ ssize_t bytes = 0;
+
+ // As this used for transports where things are available in one
+ // shot this looping should not create any problems.
+ for (size_t n = missing_data;
+ n != 0;
+ n -= bytes)
{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()")));
+ // We would have liked to use something like a recv_n ()
+ // here. But at the time when the code was written, the MEM_Stream
+ // classes had poor support for recv_n (). Till a day when we
+ // get proper recv_n (), let us stick with this. The other
+ // argument that can be said against this is that, this is the
+ // bad layer in which this is being done ie. recv_n is
+ // simulated. But...
+ bytes = this->recv (incoming.wr_ptr (),
+ n,
+ max_wait_time);
+
+ if (bytes == 0 ||
+ bytes == -1)
+ {
+ return -1;
+ }
- this->tms_->connection_closed ();
- return -1;
+ incoming.wr_ptr (bytes);
}
- if (result < 2)
- return result;
- // Now we know that we have been able to read the complete message
- // here.. We loop here to see whether we have read more than one
- // message in our read.
+ TAO_Queued_Data pqd (&incoming);
- // See we use the reactor semantics again
- result = this->process_message ();
+ // With SHMIOP we would not have any missing data...
+ pqd.missing_data_ = 0;
+ this->messaging_object ()->get_message_data (&pqd);
- return result;
+ // Resume the handle before processing the request
+ rh.resume_handle ();
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd);
}
@@ -220,127 +280,6 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major,
return 1;
}
-int
-TAO_SHMIOP_Transport::process_message (void)
-{
- // Check whether we have messages for processing
- int retval =
- this->messaging_object_->more_messages ();
-
- // The messages are fragmented, so we go back to the reactor.
- if (retval <= 0)
- return retval;
-
- // Get the <message_type> that we have received
- TAO_Pluggable_Message_Type t =
- this->messaging_object_->message_type ();
-
-
- int result = 0;
- if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("Close Connection Message recd \n")));
-
- this->tms_->connection_closed ();
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
- {
- if (this->messaging_object_->process_request_message (this,
- this->orb_core ()) == -1)
- return -1;
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- TAO_Pluggable_Reply_Params params (this->orb_core ());
- if (this->messaging_object_->process_reply_message (params) == -1)
- {
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
-
- result =
- this->tms_->dispatch_reply (params);
-
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
- if (result == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) : SHMIOP_Transport::")
- ACE_TEXT ("process_message - ")
- ACE_TEXT ("dispatch reply failed\n")));
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- if (result == 0)
- {
- this->messaging_object_->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- {
- return -1;
- }
-
- return 0;
-}
-
void
TAO_SHMIOP_Transport::transition_handler_state_i (void)
{
diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h
index 60812057b5b..61785ef6215 100644
--- a/TAO/tao/Strategies/SHMIOP_Transport.h
+++ b/TAO/tao/Strategies/SHMIOP_Transport.h
@@ -78,11 +78,10 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
- /// Read and process the message from the connection. The processing
- /// of the message is done by delegating the work to the underlying
- /// messaging object
- virtual int read_process_message (ACE_Time_Value *max_time_value = 0,
- int block =0);
+ virtual int consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
virtual int register_handler_i (void);
@@ -109,10 +108,6 @@ public:
CORBA::Octet minor);
private:
- /// Process the message that we have read
- int process_message (void);
-
-private:
/// The connection service handler used for accessing lower layer
/// communication protocols.
TAO_SHMIOP_Connection_Handler *connection_handler_;
diff --git a/TAO/tao/Strategies/TAO_Strategies.dsp b/TAO/tao/Strategies/TAO_Strategies.dsp
index 0c054c6e68f..8056375a565 100644
--- a/TAO/tao/Strategies/TAO_Strategies.dsp
+++ b/TAO/tao/Strategies/TAO_Strategies.dsp
@@ -98,46 +98,10 @@ SOURCE=.\advanced_resource.cpp
# End Source File
# Begin Source File
-SOURCE=.\DIOP_Acceptor.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connection_Handler.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connector.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Endpoint.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Factory.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Profile.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Transport.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\FIFO_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_NonReactive_Base.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Handler.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\LFU_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
@@ -226,42 +190,6 @@ SOURCE=.\advanced_resource.h
# End Source File
# Begin Source File
-SOURCE=.\DIOP_Acceptor.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connection_Handler.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connector.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Endpoint.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Factory.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Profile.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Transport.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Base.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Handler.h
-# End Source File
-# Begin Source File
-
SOURCE=.\Reactor_Per_Priority.h
# End Source File
# Begin Source File
@@ -358,26 +286,6 @@ SOURCE=.\advanced_resource.i
# End Source File
# Begin Source File
-SOURCE=.\DIOP_Acceptor.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connection_Handler.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Endpoint.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Profile.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Transport.i
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_NonReactive_Base.inl
# End Source File
# Begin Source File
diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
index 4d412f7c870..9e0e917070d 100644
--- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
@@ -16,6 +16,7 @@
#include "tao/Base_Transport_Property.h"
#include "tao/GIOP_Message_Lite.h"
#include "tao/Transport_Cache_Manager.h"
+#include "tao/Resume_Handle.h"
#if !defined (__ACE_INLINE__)
# include "UIOP_Connection_Handler.inl"
@@ -201,6 +202,12 @@ TAO_UIOP_Connection_Handler::fetch_handle (void)
return this->get_handle ();
}
+int
+TAO_UIOP_Connection_Handler::resume_handler (void)
+{
+ return TAO_RESUMES_CONNECTION_HANDLER;
+}
+
int
TAO_UIOP_Connection_Handler::add_transport_to_cache (void)
@@ -224,44 +231,23 @@ TAO_UIOP_Connection_Handler::add_transport_to_cache (void)
int
-TAO_UIOP_Connection_Handler::handle_input (ACE_HANDLE h)
-{
- return this->handle_input_i (h);
-}
-
-
-int
-TAO_UIOP_Connection_Handler::handle_input_i (ACE_HANDLE,
- ACE_Time_Value *max_wait_time)
+TAO_UIOP_Connection_Handler::handle_input (ACE_HANDLE)
{
this->pending_upcalls_++;
- // Call the transport read the message
- int result = this->transport ()->read_process_message (max_wait_time);
+ TAO_Resume_Handle resume_handle (this->orb_core (),
+ this->fetch_handle ());
- // Now the message has been read
- if (result == -1 && TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("UIOP_Connection_Handler::read_message \n")));
-
- }
+ int retval =
+ this->transport ()->handle_input_i (resume_handle);
// The upcall is done. Bump down the reference count
if (--this->pending_upcalls_ <= 0)
- result = -1;
+ retval = -1;
- if (result == 0 || result == -1)
- {
- return result;
- }
-
- return 0;
+ return retval;
}
-
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH>;
diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.h b/TAO/tao/Strategies/UIOP_Connection_Handler.h
index 3432fd3b9ae..43f3a8ec981 100644
--- a/TAO/tao/Strategies/UIOP_Connection_Handler.h
+++ b/TAO/tao/Strategies/UIOP_Connection_Handler.h
@@ -106,6 +106,9 @@ public:
/// Return the underlying handle
virtual ACE_HANDLE fetch_handle (void);
+ /// Overload for resuming handlers..
+ virtual int resume_handler (void);
+
/// Add ourselves to Cache.
int add_transport_to_cache (void);
@@ -116,12 +119,8 @@ protected:
/// Reads a message from the <peer()>, dispatching and servicing it
/// appropriately.
/// handle_input() just delegates on handle_input_i() which timeouts
- /// after <max_wait_time>, this is used in thread-per-connection to
- /// ensure that server threads eventually exit.
-
+ /// after <max_wait_time>
virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
- virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE,
- ACE_Time_Value *max_wait_time = 0);
private:
diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp
index 0049317a677..1f8f6dc86d0 100644
--- a/TAO/tao/Strategies/UIOP_Transport.cpp
+++ b/TAO/tao/Strategies/UIOP_Transport.cpp
@@ -27,19 +27,19 @@ ACE_RCSID (Strategies, UIOP_Transport, "$Id$")
TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler,
TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
+ CORBA::Boolean /*flag*/)
: TAO_Transport (TAO_TAG_UIOP_PROFILE,
orb_core)
, connection_handler_ (handler)
, messaging_object_ (0)
{
- if (flag)
+ /* if (flag)
{
// Use the lite version of the protocol
ACE_NEW (this->messaging_object_,
TAO_GIOP_Message_Lite (orb_core));
}
- else
+ else*/
{
// Use the normal GIOP object
ACE_NEW (this->messaging_object_,
@@ -82,50 +82,37 @@ TAO_UIOP_Transport::recv_i (char *buf,
size_t len,
const ACE_Time_Value *max_wait_time)
{
- return this->connection_handler_->peer ().recv (buf,
- len,
- max_wait_time);
-}
-
-int
-TAO_UIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
- int block)
-{
- // Read the message of the socket
- int result = this->messaging_object_->read_message (this,
- block,
+ ssize_t n = this->connection_handler_->peer ().recv (buf,
+ len,
max_wait_time);
- if (result == -1)
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 4)
{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("UIOP_Transport::read_message, failure in read_message ()")));
-
- this->tms_->connection_closed ();
- return -1;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv_i () \n")));
}
- if (result < 2)
- return result;
-
- // Now we know that we have been able to read the complete message
- // here.. We loop here to see whether we have read more than one
- // message in our read.
- // Set the result state
- result = 1;
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
- // See we use the reactor semantics again
- while (result > 0)
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
{
- result = this->process_message ();
+ return -1;
}
- return result;
+ return n;
}
-
int
TAO_UIOP_Transport::register_handler_i (void)
{
@@ -216,126 +203,6 @@ TAO_UIOP_Transport::messaging_init (CORBA::Octet major,
return 1;
}
-int
-TAO_UIOP_Transport::process_message (void)
-{
- // Check whether we have messages for processing
- int retval =
- this->messaging_object_->more_messages ();
-
- if (retval <= 0)
- return retval;
-
- // Get the <message_type> that we have received
- TAO_Pluggable_Message_Type t =
- this->messaging_object_->message_type ();
-
-
- int result = 0;
- if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("Close Connection Message recd \n")));
-
- this->tms_->connection_closed ();
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
- {
- if (this->messaging_object_->process_request_message (this,
- this->orb_core ()) == -1)
- return -1;
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- TAO_Pluggable_Reply_Params params (this->orb_core ());
- if (this->messaging_object_->process_reply_message (params) == -1)
- {
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("UIOP_Transport::process_message, process_reply_message ()")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
-
- result =
- this->tms_->dispatch_reply (params);
-
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
- if (result == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) : UIOP_Transport::")
- ACE_TEXT ("process_message - ")
- ACE_TEXT ("dispatch reply failed\n")));
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- if (result == 0)
- {
- this->messaging_object_->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- {
- return -1;
- }
-
- return 1;
-}
-
void
TAO_UIOP_Transport::transition_handler_state_i (void)
{
diff --git a/TAO/tao/Strategies/UIOP_Transport.h b/TAO/tao/Strategies/UIOP_Transport.h
index f3bb0b2b920..87e4715fae0 100644
--- a/TAO/tao/Strategies/UIOP_Transport.h
+++ b/TAO/tao/Strategies/UIOP_Transport.h
@@ -81,12 +81,6 @@ protected:
size_t len,
const ACE_Time_Value *s = 0);
- /// Read and process the message from the connection. The processing
- /// of the message is done by delegating the work to the underlying
- /// messaging object
- virtual int read_process_message (ACE_Time_Value *max_time_value = 0,
- int block =0);
-
virtual int register_handler_i (void);
/// Method to do whatever it needs to do when the connection
@@ -115,11 +109,6 @@ public:
private:
- /// Process the message that we have read
- int process_message (void);
-
-private:
-
/// The connection service handler used for accessing lower layer
/// communication protocols.
TAO_UIOP_Connection_Handler *connection_handler_;
diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp
index 70c17a67629..c0f5a6df904 100644
--- a/TAO/tao/Synch_Reply_Dispatcher.cpp
+++ b/TAO/tao/Synch_Reply_Dispatcher.cpp
@@ -8,6 +8,7 @@
ACE_RCSID(tao, Synch_Reply_Dispatcher, "$Id$")
+
// Constructor.
TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (
TAO_ORB_Core *orb_core,
@@ -17,8 +18,16 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher (
reply_received_ (0),
orb_core_ (orb_core),
wait_strategy_ (0),
- reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE),
- 0,
+ buf_ (),
+ db_ (sizeof buf_,
+ ACE_Message_Block::MB_DATA,
+ this->buf_,
+ this->orb_core_->message_block_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_dblock_allocator ()),
+ reply_cdr_ (&db_,
+ ACE_Message_Block::DONT_DELETE,
TAO_ENCAP_BYTE_ORDER,
TAO_DEF_GIOP_MAJOR,
TAO_DEF_GIOP_MINOR,
@@ -61,20 +70,11 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (
// dispatcher is used because the request must be re-sent.
//this->message_state_.reset (0);
- // Steal the buffer so that no copying is done.
- this->reply_cdr_.exchange_data_blocks (params.input_cdr_);
-
- /*if (&this->message_state_ != message_state)
- {
- // The Transport Mux Strategy did not use our Message_State to
- // receive the event, possibly because it is muxing multiple
- // requests over the same connection.
-
- // Steal the buffer so that no copying is done.
- this->message_state_.cdr.steal_from (message_state->cdr);
+ // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
+ ACE_Data_Block *db =
+ this->reply_cdr_.clone_from (params.input_cdr_);
- // There is no need to copy the other fields!
- }*/
+ ACE_UNUSED_ARG (db);
if (this->wait_strategy_ != 0)
{
@@ -91,12 +91,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (
return 1;
}
-/*TAO_GIOP_Message_State *
-TAO_Synch_Reply_Dispatcher::message_state (void)
-{
- return &this->message_state_;
-}*/
-
void
TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport)
{
diff --git a/TAO/tao/Synch_Reply_Dispatcher.h b/TAO/tao/Synch_Reply_Dispatcher.h
index 571948a9dbb..ec6aabe157b 100644
--- a/TAO/tao/Synch_Reply_Dispatcher.h
+++ b/TAO/tao/Synch_Reply_Dispatcher.h
@@ -66,13 +66,6 @@ protected:
IOP::ServiceContextList &reply_service_info_;
private:
- // TAO_GIOP_Message_State message_state_;
- // All the state required to receive the input...
- // @@ Having members of type TAO_GIOP* indicates that we
- // (Reply_despatcher) are aware of the underlying messaging
- // protocol. But for the present let us close our eyes till we are
- // able to iterate on a use case - Bala.
-
/// Flag that indicates the reply has been received.
int reply_received_;
@@ -83,6 +76,20 @@ private:
/// appropriate).
TAO_Wait_Strategy *wait_strategy_;
+ /* @@todo: At some point of time we are going to get to a situation
+ where TAO has huge stack sizes. Need to think on how we would
+ deal with that. One idea would be to push these things on TSS as
+ this is created by the thread on a per invocation basis. Post 1.2
+ would be a nice time for that I guess
+ */
+
+ /// The buffer that is used to initialise the data block
+ char buf_[ACE_CDR::DEFAULT_BUFSIZE];
+
+ /// datablock that is created on teh stack to initialise the CDR
+ /// stream underneath.
+ ACE_Data_Block db_;
+
/// CDR stream which has the reply information that needs to be
/// demarshalled by the stubs
TAO_InputCDR reply_cdr_;
diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp
index 15ae0b9c8d4..62572777b23 100644
--- a/TAO/tao/TAO.dsp
+++ b/TAO/tao/TAO.dsp
@@ -411,18 +411,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_State.cpp
# End Source File
# Begin Source File
@@ -487,6 +479,10 @@ SOURCE=.\IIOPC.cpp
# End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.cpp
# End Source File
# Begin Source File
@@ -711,6 +707,10 @@ SOURCE=.\Resource_Factory.cpp
# End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Sequence.cpp
# End Source File
# Begin Source File
@@ -1143,18 +1143,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.h
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.h
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.h
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_State.h
# End Source File
# Begin Source File
@@ -1219,6 +1211,10 @@ SOURCE=.\IIOPC.h
# End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.h
# End Source File
# Begin Source File
@@ -1491,6 +1487,10 @@ SOURCE=.\Resource_Factory.h
# End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.h
+# End Source File
+# Begin Source File
+
SOURCE=.\sequence.h
# End Source File
# Begin Source File
@@ -1859,19 +1859,11 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.i
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.i
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.inl
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_State.i
+SOURCE=.\GIOP_Message_State.inl
# End Source File
# Begin Source File
@@ -1923,6 +1915,10 @@ SOURCE=.\IIOPC.i
# End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.inl
# End Source File
# Begin Source File
@@ -2103,6 +2099,10 @@ SOURCE=.\Reply_Dispatcher.i
# End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\sequence.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp
index c17d1f68ce1..bf1b278f813 100644
--- a/TAO/tao/TAO_Server_Request.cpp
+++ b/TAO/tao/TAO_Server_Request.cpp
@@ -1,5 +1,6 @@
// $Id$
+
// Implementation of the Dynamic Server Skeleton Interface (for GIOP)
#include "TAO_Server_Request.h"
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index eff531ec2df..250689c9a37 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -1,6 +1,7 @@
// -*- C++ -*-
// $Id$
+
#include "Transport.h"
#include "Exception.h"
@@ -17,6 +18,7 @@
#include "Flushing_Strategy.h"
#include "Transport_Cache_Manager.h"
#include "debug.h"
+#include "Resume_Handle.h"
#include "ace/Message_Block.h"
@@ -26,6 +28,7 @@
ACE_RCSID(tao, Transport, "$Id$")
+
TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount)
: ACE_Refcountable (refcount)
, refcount_lock_ (lock)
@@ -67,6 +70,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag,
, bidirectional_flag_ (-1)
, head_ (0)
, tail_ (0)
+ , incoming_message_queue_ (orb_core)
, current_deadline_ (ACE_Time_Value::zero)
, flush_timer_id_ (-1)
, transport_timer_ (this)
@@ -121,7 +125,7 @@ TAO_Transport::~TAO_Transport (void)
}
int
-TAO_Transport::handle_output ()
+TAO_Transport::handle_output (void)
{
if (TAO_debug_level > 4)
{
@@ -783,6 +787,674 @@ TAO_Transport::generate_request_header (
}
int
+TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
+ ACE_Time_Value * max_wait_time,
+ int /*block*/)
+{
+ // First try to process messages of the head of the incoming queue.
+ int retval = this->process_queue_head (rh);
+
+ if (retval <= 0)
+ {
+ if (retval == -1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) TAO::handle_input_i,"
+ "error while parsing the head of the queue \n"));
+
+ this->tms_->connection_closed ();
+ }
+ return retval;
+ }
+
+ // If there are no messages then we can go ahead to read from the
+ // handle for further reading..
+
+ // The buffer on the stack which will be used to hold the input
+ // messages
+ char buf [TAO_CONNECTION_HANDLER_STACK_BUF_SIZE];
+
+#if defined (ACE_HAS_PURIFY)
+ (void) ACE_OS::memset (buf,
+ '\0',
+ sizeof buf);
+#endif /* ACE_HAS_PURIFY */
+
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->message_block_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_dblock_allocator ());
+
+ // Create a message block
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->message_block_msgblock_allocator ());
+
+
+ // Align the message block
+ ACE_CDR::mb_align (&message_block);
+
+
+ // Read the message into the message block that we have created on
+ // the stack.
+ ssize_t n = this->recv (message_block.rd_ptr (),
+ message_block.space (),
+ max_wait_time);
+
+ // If there is an error return to the reactor..
+ if (n <= 0)
+ {
+ if (n == -1)
+ this->tms_->connection_closed ();
+
+ return n;
+ }
+
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) Read [%d] bytes \n",
+ n));
+ }
+
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
+
+ // Parse the message and try consolidating the message if
+ // needed.
+ retval = this->parse_consolidate_messages (message_block,
+ rh,
+ max_wait_time);
+
+ if (retval <= 0)
+ {
+ if (retval == -1 && TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport::handle_input_i "
+ "error while parsing and consolidating \n"));
+ }
+ return retval;
+ }
+
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block);
+
+ // Extract the data for the node..
+ this->messaging_object ()->get_message_data (&qd);
+
+ // Resume before starting to process the request..
+ rh.resume_handle ();
+
+ // Process the message
+ return this->process_parsed_messages (&qd);
+}
+
+int
+TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->parse_incoming_messages (block) == -1)
+ return -1;
+
+ // Check whether we have a complete message for processing
+ ssize_t missing_data = this->missing_data (block);
+
+ if (missing_data < 0)
+ {
+ // If we have more than one message
+ return this->consolidate_extra_messages (block,
+ rh);
+ }
+ else if (missing_data > 0)
+ {
+ // If we have missing data then try doing a read or try queueing
+ // them.
+ return this->consolidate_message (block,
+ missing_data,
+ rh,
+ max_wait_time);
+ }
+
+ return 1;
+}
+
+int
+TAO_Transport::parse_incoming_messages (ACE_Message_Block &block)
+{
+ // If we have a queue and if the last message is not complete a
+ // complete one, then this read will get us the remaining data. So
+ // do not try to parse the header if we have an incomplete message
+ // in the queue.
+ if (this->incoming_message_queue_.is_tail_complete () != 0)
+ {
+ // As it looks like a new message has been read, process the
+ // message. Call the messaging object to do the parsing..
+ int retval =
+ this->messaging_object ()->parse_incoming_messages (block);
+
+ if (retval == -1)
+ {
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - error in incoming message \n")));
+
+ this->tms_->connection_closed ();
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+size_t
+TAO_Transport::missing_data (ACE_Message_Block &incoming)
+{
+ // If we have a incomplete message in the queue then find out how
+ // much of data is required to get a complete message
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ {
+ return this->incoming_message_queue_.missing_data_tail ();
+ }
+
+ return this->messaging_object ()->missing_data (incoming);
+}
+
+
+int
+TAO_Transport::consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ // Check whether the last message in the queue is complete..
+ if (this->incoming_message_queue_.is_tail_complete () == 0)
+ return this->consolidate_message_queue (incoming,
+ missing_data,
+ rh,
+ max_wait_time);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n",
+ this->id ()));
+ }
+
+ // Calculate the actual length of the load that we are supposed to
+ // read which is equal to the <missing_data> + length of the buffer
+ // that we have..
+ size_t payload = missing_data + incoming.length ();
+
+ // Grow the buffer to the size of the message
+ ACE_CDR::grow (&incoming,
+ payload);
+
+ // .. do a read on the socket again.
+ ssize_t n = this->recv (incoming.wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // If we got an EWOULDBLOCK or some other error..
+ if (n <= 0)
+ {
+ if (n == -1)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Trasport::consolidate_message,"
+ "error while trying to consolidate \n"));
+ }
+ this->tms_->connection_closed ();
+ }
+
+ return n;
+ }
+
+ // Move the write pointer
+ incoming.wr_ptr (n);
+
+ // ..Decrement
+ missing_data -= n;
+
+ if (missing_data > 0)
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"
+ "insufficient read, queueing up the message \n",
+ this->id ()));
+ }
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data ();
+
+ // Add the missing data to the queue
+ qd->missing_data_ = missing_data;
+
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = incoming.duplicate ();
+
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ return 0;
+ }
+
+ // Check to see if we have messages in queue. AT this point we
+ // cannot have have semi-complete messages in the queue as they
+ // would have been taken care before
+ if (this->incoming_message_queue_.queue_length ())
+ {
+ // If we have messages in the queue, put the <incoming> in the
+ // queue
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n"
+ " queueing up the message \n",
+ this->id ()));
+ }
+
+ // Get an instance of TAO_Queued_Data
+ TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data ();
+
+ // Duplicate the data block before putting it in the queue.
+ qd->msg_block_ = incoming.duplicate ();
+
+ // Get the rest of the messaging data
+ this->messaging_object ()->get_message_data (qd);
+
+ // Add it to the tail of the queue..
+ this->incoming_message_queue_.enqueue_tail (qd);
+
+ // Process one on the head of the queue and return
+ return this->process_queue_head (rh);
+ }
+
+
+ // We dont have any missing data. Just make a queued_data node with
+ // the existing message block and send it to the higher layers of
+ // the ORB.
+ TAO_Queued_Data pqd (&incoming);
+ pqd.missing_data_ = missing_data;
+ this->messaging_object ()->get_message_data (&pqd);
+
+ // Resume the handle before processing the request
+ rh.resume_handle ();
+
+ // Now we have a full message in our buffer. Just go ahead and
+ // process that
+ return this->process_parsed_messages (&pqd);
+}
+
+int
+TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n",
+ this->id ()));
+ }
+
+ // If the queue did not have a complete message put this piece of
+ // message in the queue. We kow it did not have a complete
+ // message. That is why we are here.
+ size_t n = this->incoming_message_queue_.copy_tail (incoming);
+
+ // Update the missing data...
+ missing_data = this->incoming_message_queue_.missing_data_tail ();
+
+ // Move the read pointer of the <incoming> message block to the end
+ // of the copied message and process the remaining portion...
+ incoming.rd_ptr (n);
+
+ // If we have some more information left in the message block..
+ if (incoming.length ())
+ {
+ // We may have to parse & consolidate. This part of the message
+ // doesn't seem to be part of the last message in the queue (as
+ // the copy () hasn't taken away this message).
+ int retval = this->parse_consolidate_messages (incoming,
+ rh,
+ max_wait_time);
+
+ // If there is an error return
+ if (retval == -1)
+ {
+ if (TAO_debug_level)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Error while consolidating... \n",
+ "TAO (%P|%t) - .. part of the read message \n"));
+ }
+ return retval;
+ }
+
+ // parse_consolidate_messages () would have processed one of the
+ // messages, so we better return as we dont want to starve other
+ // threads.
+ return 0;
+ }
+
+ // If we still have some missing data..
+ if (missing_data > 0)
+ {
+ // Get the last message from the Queue
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ // Try to do a read again. If we have some luck it would be
+ // great..
+ ssize_t n = this->recv (qd->msg_block_->wr_ptr (),
+ missing_data,
+ max_wait_time);
+
+ // Error...
+ if (n <= 0)
+ return n;
+
+ // Move the write pointer
+ qd->msg_block_->wr_ptr (n);
+
+ // Decrement the missing data
+ qd->missing_data_ -= n;
+
+ // Now put the TAO_Queued_Data back in the queue
+ this->incoming_message_queue_.enqueue_tail (qd);
+ }
+
+ // Process a message in the head of the queue if we have one..
+ return this->process_queue_head (rh);
+}
+
+
+int
+TAO_Transport::consolidate_extra_messages (ACE_Message_Block
+ &incoming,
+ TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n",
+ this->id ()));
+ }
+
+ // Pick the tail of the queue
+ TAO_Queued_Data *tail =
+ this->incoming_message_queue_.dequeue_tail ();
+
+ if (tail)
+ {
+ // If we have a node in the tail, checek to see whether it needs
+ // consolidation. If so, just consolidate it.
+ if (this->messaging_object ()->consolidate_node (tail,
+ incoming) == -1)
+ return -1;
+
+ // .. put the tail back in queue..
+ this->incoming_message_queue_.enqueue_tail (tail);
+ }
+
+ int retval = 1;
+
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n"
+ "..............extracting extra messages \n",
+ this->id ()));
+ }
+
+ // Extract messages..
+ while (retval == 1)
+ {
+ TAO_Queued_Data *q_data = 0;
+
+ retval =
+ this->messaging_object ()->extract_next_message (incoming,
+ q_data);
+ if (q_data)
+ this->incoming_message_queue_.enqueue_tail (q_data);
+ }
+
+ // In case of error return..
+ if (retval == -1)
+ {
+ return retval;
+ }
+
+ return this->process_queue_head (rh);
+}
+
+int
+TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd)
+{
+ // Get the <message_type> that we have received
+ TAO_Pluggable_Message_Type t = qd->msg_type_;
+
+ int result = 0;
+ if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("Close Connection Message recd \n")));
+
+ // Close the TMS
+ this->tms_->connection_closed ();
+
+ // Return a "-1" so that the next stage can take care of
+ // closing connection and the necessary memory management.
+ return -1;
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
+ {
+ if (this->messaging_object ()->process_request_message (
+ this,
+ qd) == -1)
+ {
+ // Close the TMS
+ this->tms_->connection_closed ();
+
+ // Return a "-1" so that the next stage can take care of
+ // closing connection and the necessary memory management.
+ return -1;
+ }
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
+ {
+ // @@todo: Maybe the input_cdr can be constructed from the
+ // message_block
+ TAO_Pluggable_Reply_Params params (this->orb_core ());
+
+ if (this->messaging_object ()->process_reply_message (params,
+ qd) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("IIOP_Transport::process_message, ")
+ ACE_TEXT ("process_reply_message ()")));
+
+ this->messaging_object ()->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ result = this->tms ()->dispatch_reply (params);
+
+ // @@ Somehow it seems dangerous to reset the state *after*
+ // dispatching the request, what if another threads receives
+ // another reply in the same connection?
+ // My guess is that it works as follows:
+ // - For the exclusive case there can be no such thread.
+ // - The the muxed case each thread has its own message_state.
+ // I'm pretty sure this comment is right. Could somebody else
+ // please look at it and confirm my guess?
+
+ // @@ The above comment was found in the older versions of the
+ // code. The code was also written in such a way that, when
+ // the client thread on a call from handle_input () from the
+ // reactor a call would be made on the handle_client_input
+ // (). The implementation of handle_client_input () looked so
+ // flaky. It used to create a message state upon entry in to
+ // the function using the TMS and destroy that on exit. All
+ // this was fine _theoretically_ for multiple threads. But
+ // the flakiness was originating in the implementation of
+ // get_message_state () where we were creating message state
+ // only once and dishing it out for every thread till one of
+ // them destroy's it. So, it looked broken. That has been
+ // changed. Why?. To my knowledge, the reactor does not call
+ // handle_input () on two threads at the same time. So, IMHO
+ // that defeats the purpose of creating a message state for
+ // every thread. This is just my guess. If we run in to
+ // problems this place needs to be revisited. If someone else
+ // is going to take a look please contact bala@cs.wustl.edu
+ // for details on this-- Bala
+
+ if (result == -1)
+ {
+ // Something really critical happened, we will forget about
+ // every reply on this connection.
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::")
+ ACE_TEXT ("process_message - ")
+ ACE_TEXT ("dispatch reply failed\n")));
+
+ this->messaging_object ()->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ if (result == 0)
+ {
+
+ this->messaging_object ()->reset ();
+
+ // The reply dispatcher was no longer registered.
+ // This can happened when the request/reply
+ // times out.
+ // To throw away all registered reply handlers is
+ // not the right thing, as there might be just one
+ // old reply coming in and several valid new ones
+ // pending. If we would invoke <connection_closed>
+ // we would throw away also the valid ones.
+ //return 0;
+ }
+
+
+ // This is a NOOP for the Exclusive request case, but it actually
+ // destroys the stream in the muxed case.
+ //this->tms_->destroy_message_state (message_state);
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ {
+ return -1;
+ }
+
+ // If not, just return back..
+ return 0;
+}
+
+int
+TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
+{
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::process_queue_head \n",
+ this->id ()));
+ }
+
+ // See if the message in the head of the queue is complete...
+ if (this->incoming_message_queue_.is_head_complete () == 1)
+ {
+ // Get the message on the head of the queue..
+ TAO_Queued_Data *qd =
+ this->incoming_message_queue_.dequeue_head ();
+
+ // Now that we have pulled out out one message out of the queue,
+ // check whether we have one more message in the queue...
+ if (this->incoming_message_queue_.is_head_complete () == 1)
+ {
+ // Get the event handler..
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
+
+ // Get the reactor associated with the event handler
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor == 0)
+ return -1;
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::notify to Reactor\n",
+ this->id ()));
+ }
+
+ // Let the class know that it doesn't need to resume the
+ // handle..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
+
+ // Send a notification to the reactor...
+ int retval = reactor->notify (eh,
+ ACE_Event_Handler::READ_MASK);
+
+ if (retval < 0 && TAO_debug_level > 2)
+ {
+ // @@todo: need to think about what is the action that
+ // we can take when we get here.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport::process_queue_head ")
+ ACE_TEXT ("notify to the reactor failed.. \n")));
+ }
+
+ }
+ else
+ {
+ // As we are ready to process the last message just resume
+ // the handle. Set the flag incase someone had reset the flag..
+ rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
+ rh.resume_handle ();
+ }
+
+ // Process the message...
+ if (this->process_parsed_messages (qd) == -1)
+ return -1;
+
+ // Delete the Queued_Data..
+ TAO_Queued_Data::release (qd);
+
+ return 0;
+ }
+
+ return 1;
+}
+
+
+int
TAO_Transport::queue_is_empty (void)
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index 54f8539ae1d..99c0c4512e9 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -19,15 +19,18 @@
#include "ace/pre.h"
#include "corbafwd.h"
+
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
#include "Exception.h"
#include "Transport_Descriptor_Interface.h"
#include "Transport_Cache_Manager.h"
#include "Transport_Timer.h"
#include "ace/Strategies.h"
-
-#if !defined (ACE_LACKS_PRAGMA_ONCE)
-# pragma once
-#endif /* ACE_LACKS_PRAGMA_ONCE */
+#include "Incoming_Message_Queue.h"
class TAO_ORB_Core;
class TAO_Target_Specification;
@@ -38,6 +41,7 @@ class TAO_Connection_Handler;
class TAO_Pluggable_Messaging;
class TAO_Queued_Message;
+class TAO_Resume_Handle;
class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable
{
@@ -72,7 +76,7 @@ protected:
*
* <H3>The outgoing data path:</H3>
*
- * One of the responsabilities of the TAO_Transport class is to send
+ * One of the responsibilities of the TAO_Transport class is to send
* out GIOP messages as efficiently as possible. In most cases
* messages are put out in FIFO order, the transport object will put
* out the message using a single system call and return control to
@@ -129,15 +133,71 @@ protected:
* - A per-transport 'send strategy' to choose between blocking on
* write, blocking on the reactor or blockin on leader/follower.
* - A per-message 'waiting object'
- * - A per-meessage timeout
+ * - A per-message timeout
*
* The Transport object provides a single method to send messages
* (send_message ()).
*
* <H3>The incoming data path:</H3>
*
- * @todo Document the incoming data path design forces.
+ * One of the main responsibilities of the transport is to read and
+ * process the incoming GIOP message as quickly and efficiently as
+ * possible. There are other forces that needs to be given due
+ * consideration. They are
+ * - Multiple threads should be able to tarverse along the same data
+ * path but should not be able to read from the same handle at the
+ * same time ie. the handle should not be shared between threads at
+ * any instant.
+ * - Reads on the handle could give one or more messages.
+ * - Minimise locking and copying overhead when trying to attack the
+ * above.
+ *
+ * <H3> Parsing messages (GIOP) & processing the message:</H3>
+ *
+ * The messages should be checked for validity and the right
+ * information should be sent to the higher layer for processing. The
+ * process of doing a sanity check and preparing the messages for the
+ * higher layers of the ORB are done by the messaging protocol.
+ *
+ * <H3> Design forces and Challenges </H3>
+ *
+ * To keep things as efficient as possible for medium sized requests,
+ * it would be good to minimise data copying and locking along the
+ * incoming path ie. from the time of reading the data from the handle
+ * to the application. We achieve this by creating a buffer on stack
+ * and reading the data from the handle into the buffer. We then pass
+ * the same data block (the buffer is encapsulated into a data block)
+ * to the higher layers of the ORB. The problems stem from the
+ * following
+ * (a) Data is bigger than the buffer that we have on stack
+ * (b) Transports like TCP do not guarentee availability of the whole
+ * chunk of data in one shot. Data could trickle in byte by byte.
+ * (c) Single read gives multiple messages
+ *
+ * We solve the problems as follows
+ *
+ * (a) First do a read with the buffer on stack. Query the underlying
+ * messaging object whether the message has any incomplete
+ * portion. If so, we just grow the buffer for the missing size
+ * and read the rest of the message. We free the handle and then
+ * send the message to the higher layers of the ORB for
+ * processing.
*
+ * (b) If we block (ie. if we receive a EWOULDBLOCK) while trying to
+ * do the above (ie. trying to read after growing the buffer
+ * size) we put the message in a queue and return back to the
+ * reactor. The reactor would call us back when the handle
+ * becomes read ready.
+ *
+ * (c) If we get multiple messages (possible if the client connected
+ * to the server sends oneways or AMI requests), we parse and
+ * split the messages. Every message is put in the queue. Once
+ * the messages are queued, the thread picks up one message to
+ * send to the higher layers of the ORB. Before doing that, if
+ * it finds more messages, it sends a notify to the reactor
+ * without resuming the handle. The next thread picks up a
+ * message from the queue and processes that. Once the queue
+ * is drained the last thread resumes the handle.
*
* <B>See Also:</B>
*
@@ -451,6 +511,30 @@ public:
TAO_Target_Specification &spec,
TAO_OutputCDR &msg);
+ /// Callback to read incoming data
+ /**
+ * The ACE_Event_Handler adapter invokes this method as part of its
+ * handle_input() operation.
+ *
+ * @todo: the method name is confusing! Calling it handle_input()
+ * would probably make things easier to understand and follow!
+ *
+ * Once a complete message is read the Transport class delegates on
+ * the Messaging layer to invoke the right upcall (on the server) or
+ * the TAO_Reply_Dispatcher (on the client side).
+ *
+ * @param max_wait_time In some cases the I/O is synchronous, e.g. a
+ * thread-per-connection server or when Wait_On_Read is enabled. In
+ * those cases a maximum read time can be specified.
+ *
+ * @param block Is deprecated and ignored.
+ *
+ */
+ virtual int handle_input_i (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time = 0,
+ int block = 0);
+
+
/// Prepare the waiting and demuxing strategy to receive a reply for
/// a new request.
/**
@@ -501,29 +585,6 @@ public:
int is_synchronous = 1,
ACE_Time_Value *max_time_wait = 0) = 0;
- /// Callback to read incoming data
- /**
- * The ACE_Event_Handler adapter invokes this method as part of its
- * handle_input() operation.
- *
- * @todo: the method name is confusing! Calling it handle_input()
- * would probably make things easier to understand and follow!
- *
- * Once a complete message is read the Transport class delegates on
- * the Messaging layer to invoke the right upcall (on the server) or
- * the TAO_Reply_Dispatcher (on the client side).
- *
- * @param max_wait_time In some cases the I/O is synchronous, e.g. a
- * thread-per-connection server or when Wait_On_Read is enabled. In
- * those cases a maximum read time can be specified.
- *
- * @param block Is deprecated and ignored.
- *
- */
- // @@ lockme
- virtual int read_process_message (ACE_Time_Value *max_wait_time = 0,
- int block = 0) = 0;
-
protected:
/// Register the handler with the reactor.
/**
@@ -548,6 +609,53 @@ protected:
*/
virtual void transition_handler_state_i (void) = 0;
+
+ /// Called by the handle_input_i (). This method is used to parse
+ /// message read by the handle_input_i () call. It also decides
+ /// whether the message needs consolidation before processing.
+ int parse_consolidate_messages (ACE_Message_Block &bl,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *time = 0);
+
+
+ /// Method does parsing of the message if we have a fresh message in
+ /// the <message_block> or just returns if we have read part of the
+ /// previously stored message.
+ int parse_incoming_messages (ACE_Message_Block &message_block);
+
+ /// Return if we have any missing data in the queue of messages
+ /// or determine if we have more information left out in the
+ /// presently read message to make it complete.
+ size_t missing_data (ACE_Message_Block &message_block);
+
+ /// Consolidate the currently read message or consolidate the last
+ /// message in the queue. The consolidation of the last message in
+ /// the queue is done by calling consolidate_message_queue ().
+ virtual int consolidate_message (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
+
+ /// First consolidate the message queue. If the message is still not
+ /// complete, try to read from the handle again to make it
+ /// complete. If these dont help put the message back in the queue
+ /// and try to check the queue if we have message to process. (the
+ /// thread needs to do some work anyway :-))
+ int consolidate_message_queue (ACE_Message_Block &incoming,
+ ssize_t missing_data,
+ TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time);
+
+ /// Called by parse_consolidate_message () if we have more messages
+ /// in one read. Queue up the messages and try to process one of
+ /// them, atleast at the head of them.
+ int consolidate_extra_messages (ACE_Message_Block &incoming,
+ TAO_Resume_Handle &rh);
+
+ /// Process the message by sending it to the higher layers of the
+ /// ORB.
+ int process_parsed_messages (TAO_Queued_Data *qd);
+
public:
/// Method for the connection handler to signify that it
/// is being closed and destroyed.
@@ -700,6 +808,11 @@ private:
/// Print out error messages if the event handler is not valid
void report_invalid_event_handler (const char *caller);
+ /// Process the message that is in the head of the incoming queue.
+ /// If there are more messages in the queue, this method sends a
+ /// notify () to the reactor to send a next thread along.
+ int process_queue_head (TAO_Resume_Handle &rh);
+
/// Prohibited
ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&))
ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&))
@@ -747,6 +860,9 @@ protected:
TAO_Queued_Message *head_;
TAO_Queued_Message *tail_;
+ /// Queue of the incoming messages..
+ TAO_Incoming_Message_Queue incoming_message_queue_;
+
/// The queue will start draining no later than <queing_deadline_>
/// *if* the deadline is
ACE_Time_Value current_deadline_;
diff --git a/TAO/tao/Transport_Mux_Strategy.h b/TAO/tao/Transport_Mux_Strategy.h
index 447f40ce344..5937ff16e9a 100644
--- a/TAO/tao/Transport_Mux_Strategy.h
+++ b/TAO/tao/Transport_Mux_Strategy.h
@@ -68,16 +68,6 @@ public:
/// allocated for that request.
virtual int dispatch_reply (TAO_Pluggable_Reply_Params &params) = 0;
- // = "Factory methods" to obtain the CDR stream, in the Muxed case
- // the factory simply allocates a new one, in the Exclusive case
- // the factory returns a pointer to the pre-allocated CDR.
-
- // virtual TAO_GIOP_Message_State *get_message_state (void) = 0;
- // Get a CDR stream.
-
- // virtual void destroy_message_state (TAO_GIOP_Message_State *) = 0;
- // Destroy a CDR stream.
-
/// Request has been just sent, but the reply is not received. Idle
/// the transport now.
virtual int idle_after_send (void) = 0;
diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp
index 3320298e49d..29e983a90db 100644
--- a/TAO/tao/Wait_On_Read.cpp
+++ b/TAO/tao/Wait_On_Read.cpp
@@ -2,6 +2,7 @@
#include "tao/Wait_On_Read.h"
#include "Transport.h"
+#include "Resume_Handle.h"
ACE_RCSID(tao, Wait_On_Read, "$Id$")
@@ -26,10 +27,13 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time,
// Do the same sort of looping that is done in other wait
// strategies.
int retval = 0;
+ TAO_Resume_Handle rh;
while (1)
{
retval =
- this->transport_->read_process_message (max_wait_time, 1);
+ this->transport_->handle_input_i (rh,
+ max_wait_time,
+ 1);
// If we got our reply, no need to run the loop any
// further.