diff options
author | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-06 04:41:00 +0000 |
---|---|---|
committer | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2001-07-06 04:41:00 +0000 |
commit | d8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8 (patch) | |
tree | 3874c3e46e81a1a1c5a6c459720e1c17cab62da2 /TAO/tao | |
parent | 08c2939a52133c144b5f17b7f8556b5dc046c0b0 (diff) | |
download | ATCD-d8ad30bbf6dbe53647040d40d2e53fbdf8edf4b8.tar.gz |
ChangeLogTag: Thu Jul 5 23:30:07 2001 Balachandran Natarajan <bala@cs.wustl.edu>
Diffstat (limited to 'TAO/tao')
85 files changed, 2935 insertions, 2018 deletions
diff --git a/TAO/tao/Acceptor_Filter.cpp b/TAO/tao/Acceptor_Filter.cpp index 8ba9afd45ee..98a3efb01d2 100644 --- a/TAO/tao/Acceptor_Filter.cpp +++ b/TAO/tao/Acceptor_Filter.cpp @@ -1,13 +1,16 @@ // $Id$ + #include "tao/Acceptor_Filter.h" #if !defined (__ACE_INLINE__) # include "Acceptor_Filter.i" #endif /* __ACE_INLINE__ */ + ACE_RCSID(tao, Acceptor_Filter, "$Id$") + TAO_Acceptor_Filter::~TAO_Acceptor_Filter (void) { } diff --git a/TAO/tao/Adapter.cpp b/TAO/tao/Adapter.cpp index 41c9e3a8bd9..1f363431999 100644 --- a/TAO/tao/Adapter.cpp +++ b/TAO/tao/Adapter.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "tao/Adapter.h" #include "tao/Object.h" #include "tao/Object_KeyC.h" diff --git a/TAO/tao/Any.cpp b/TAO/tao/Any.cpp index 4a5c307d38e..35aadd83e34 100644 --- a/TAO/tao/Any.cpp +++ b/TAO/tao/Any.cpp @@ -1,5 +1,6 @@ // $Id$ + // Portions of this file are: // Copyright 1994-1995 by Sun Microsystems Inc. // All Rights Reserved @@ -18,6 +19,7 @@ ACE_RCSID(tao, Any, "$Id$") + CORBA::TypeCode_ptr CORBA_Any::type (void) const { diff --git a/TAO/tao/Asynch_Reply_Dispatcher.cpp b/TAO/tao/Asynch_Reply_Dispatcher.cpp index b4a20a82253..9f2bbc0a5eb 100644 --- a/TAO/tao/Asynch_Reply_Dispatcher.cpp +++ b/TAO/tao/Asynch_Reply_Dispatcher.cpp @@ -1,6 +1,5 @@ // $Id$ - #include "tao/Asynch_Reply_Dispatcher.h" #include "tao/Pluggable_Messaging_Utils.h" @@ -19,8 +18,16 @@ ACE_RCSID(tao, Asynch_Reply_Dispatcher, "$Id$") // Constructor. TAO_Asynch_Reply_Dispatcher_Base::TAO_Asynch_Reply_Dispatcher_Base (TAO_ORB_Core *orb_core) - : reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - 0, + : buf_ (), + db_ (sizeof buf_, + ACE_Message_Block::MB_DATA, + this->buf_, + orb_core->message_block_buffer_allocator (), + orb_core->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + orb_core->message_block_dblock_allocator ()), + reply_cdr_ (&db_, + ACE_Message_Block::MB_DATA, TAO_ENCAP_BYTE_ORDER, TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR, @@ -48,11 +55,6 @@ TAO_Asynch_Reply_Dispatcher_Base::dispatch_reply ( return 0; } -/*TAO_GIOP_Message_State * -TAO_Asynch_Reply_Dispatcher_Base::message_state (void) -{ - return this->message_state_; -} */ void TAO_Asynch_Reply_Dispatcher_Base::dispatcher_bound (TAO_Transport *) @@ -113,11 +115,11 @@ TAO_Asynch_Reply_Dispatcher::dispatch_reply ( this->reply_status_ = params.reply_status_; - // this->message_state_ = message_state; - - // Steal the buffer so that no copying is done. - this->reply_cdr_.exchange_data_blocks (params.input_cdr_); + // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); + ACE_UNUSED_ARG (db); // Steal the buffer, that way we don't do any unnecesary copies of // this data. CORBA::ULong max = params.svc_ctx_.maximum (); diff --git a/TAO/tao/Asynch_Reply_Dispatcher.h b/TAO/tao/Asynch_Reply_Dispatcher.h index b8b1c978451..f81c68a142b 100644 --- a/TAO/tao/Asynch_Reply_Dispatcher.h +++ b/TAO/tao/Asynch_Reply_Dispatcher.h @@ -71,8 +71,12 @@ protected: */ IOP::ServiceContextList reply_service_info_; - // TAO_GIOP_Message_State *message_state_; - // CDR stream for reading the input. + /// The buffer that is used to initialise the data block + char buf_[ACE_CDR::DEFAULT_BUFSIZE]; + + /// datablock that is created on teh stack to initialise the CDR + /// stream underneath. + ACE_Data_Block db_; /// CDR stream which has the reply information that needs to be /// demarshalled by the stubs diff --git a/TAO/tao/CDR.cpp b/TAO/tao/CDR.cpp index 07eb03a7cf2..a08222e531f 100644 --- a/TAO/tao/CDR.cpp +++ b/TAO/tao/CDR.cpp @@ -41,9 +41,11 @@ # include "tao/CDR.i" #endif /* ! __ACE_INLINE__ */ + ACE_RCSID(tao, CDR, "$Id$") + #if defined (ACE_ENABLE_TIMEPROBES) static const char *TAO_CDR_Timeprobe_Description[] = diff --git a/TAO/tao/CORBALOC_Parser.cpp b/TAO/tao/CORBALOC_Parser.cpp index 13f09350804..ab4b228771a 100644 --- a/TAO/tao/CORBALOC_Parser.cpp +++ b/TAO/tao/CORBALOC_Parser.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "CORBALOC_Parser.h" #include "ORB_Core.h" #include "Stub.h" diff --git a/TAO/tao/ClientRequestInfo.cpp b/TAO/tao/ClientRequestInfo.cpp index 8dab949ae98..281ca1ed6a7 100644 --- a/TAO/tao/ClientRequestInfo.cpp +++ b/TAO/tao/ClientRequestInfo.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "ClientRequestInfo.h" #include "Invocation.h" #include "Stub.h" diff --git a/TAO/tao/Connection_Handler.cpp b/TAO/tao/Connection_Handler.cpp index 985dbbf44c8..049b1d15a49 100644 --- a/TAO/tao/Connection_Handler.cpp +++ b/TAO/tao/Connection_Handler.cpp @@ -6,6 +6,7 @@ #include "tao/debug.h" #include "tao/Object.h" #include "tao/Messaging_Policy_i.h" +#include "Resume_Handle.h" #if !defined (__ACE_INLINE__) #include "tao/Connection_Handler.inl" @@ -90,10 +91,15 @@ TAO_Connection_Handler::svc_i (void) max_wait_time = ¤t_timeout; } + TAO_Resume_Handle rh (this->orb_core_, + ACE_INVALID_HANDLE); + while (!this->orb_core_->has_shutdown () && result >= 0) { - result = this->handle_input_i (ACE_INVALID_HANDLE, max_wait_time); + result = + this->transport ()->handle_input_i (rh, + max_wait_time); if (result == -1 && errno == ETIME) { diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h index 7caeccecd4b..b0ac01d0077 100644 --- a/TAO/tao/Connection_Handler.h +++ b/TAO/tao/Connection_Handler.h @@ -101,9 +101,13 @@ protected: /// Object. int svc_i (void); - /// Need to be implemented by the underlying protocol objects - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0) = 0; +#if !defined (TAO_CONNECTION_HANDLER_STACK_BUF_SIZE) +# define TAO_CONNECTION_HANDLER_STACK_BUF_SIZE 1024 +#endif /*TAO_CONNECTION_HANDLER_STACK_BUF_SIZE */ + +#if !defined (TAO_RESUMES_CONNECTION_HANDLER) +# define TAO_RESUMES_CONNECTION_HANDLER 1 +#endif /*TAO_RESUMES_CONNECTION_HANDLER*/ private: diff --git a/TAO/tao/DomainC.cpp b/TAO/tao/DomainC.cpp index 7dfa513c009..f2e9509a4f6 100644 --- a/TAO/tao/DomainC.cpp +++ b/TAO/tao/DomainC.cpp @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/DomainC.i b/TAO/tao/DomainC.i index 08c89453f73..b86dfb17929 100644 --- a/TAO/tao/DomainC.i +++ b/TAO/tao/DomainC.i @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/DynamicC.cpp b/TAO/tao/DynamicC.cpp index 310487d401c..3ba3d039091 100644 --- a/TAO/tao/DynamicC.cpp +++ b/TAO/tao/DynamicC.cpp @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/DynamicC.i b/TAO/tao/DynamicC.i index 09fd7a3ff51..b45699ab5bb 100644 --- a/TAO/tao/DynamicC.i +++ b/TAO/tao/DynamicC.i @@ -70,7 +70,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p) { Parameter *deep_copy = new Parameter (*p.ptr_); - + if (deep_copy != 0) { Parameter *tmp = deep_copy; @@ -80,7 +80,7 @@ Dynamic::Parameter_var::operator= (const ::Dynamic::Parameter_var &p) } } } - + return *this; } @@ -103,20 +103,20 @@ Dynamic::Parameter_var::operator const ::Dynamic::Parameter &() const // cast } ACE_INLINE -Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast +Dynamic::Parameter_var::operator ::Dynamic::Parameter &() // cast { return *this->ptr_; } ACE_INLINE -Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast +Dynamic::Parameter_var::operator ::Dynamic::Parameter &() const // cast { return *this->ptr_; } // variable-size types only ACE_INLINE -Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast +Dynamic::Parameter_var::operator ::Dynamic::Parameter *&() // cast { return this->ptr_; } @@ -133,7 +133,7 @@ Dynamic::Parameter_var::inout (void) return *this->ptr_; } -// mapping for variable size +// mapping for variable size ACE_INLINE ::Dynamic::Parameter *& Dynamic::Parameter_var::out (void) { @@ -194,7 +194,7 @@ Dynamic::Parameter_out::operator= (Parameter *p) return *this; } -ACE_INLINE +ACE_INLINE Dynamic::Parameter_out::operator ::Dynamic::Parameter *&() // cast { return this->ptr_; @@ -214,7 +214,7 @@ Dynamic::Parameter_out::operator-> (void) #if !defined (TAO_USE_SEQUENCE_TEMPLATES) - + #if !defined (__TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_) #define __TAO_UNBOUNDED_SEQUENCE_DYNAMIC_PARAMETERLIST_CI_ @@ -227,24 +227,24 @@ Dynamic::Parameter_out::operator-> (void) ACE_NEW_RETURN (retval, Dynamic::Parameter[size], 0); return retval; } - + ACE_INLINE void Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::freebuf (Dynamic::Parameter *buffer) // Free the sequence. { delete [] buffer; } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (void) // Default constructor. { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum) // Constructor using a maximum length value. : TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (maximum)) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (CORBA::ULong maximum, CORBA::ULong length, @@ -253,7 +253,7 @@ Dynamic::Parameter_out::operator-> (void) : TAO_Unbounded_Base_Sequence (maximum, length, data, release) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::_TAO_Unbounded_Sequence_Dynamic_ParameterList (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs) // Copy constructor. @@ -263,10 +263,10 @@ Dynamic::Parameter_out::operator-> (void) { Dynamic::Parameter *tmp1 = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (this->maximum_); Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < this->length_; ++i) tmp1[i] = tmp2[i]; - + this->buffer_ = tmp1; } else @@ -274,14 +274,14 @@ Dynamic::Parameter_out::operator-> (void) this->buffer_ = 0; } } - + ACE_INLINE Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList & Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator= (const _TAO_Unbounded_Sequence_Dynamic_ParameterList &rhs) // Assignment operator. { if (this == &rhs) return *this; - + if (this->release_) { if (this->maximum_ < rhs.maximum_) @@ -294,18 +294,18 @@ Dynamic::Parameter_out::operator-> (void) } else this->buffer_ = _TAO_Unbounded_Sequence_Dynamic_ParameterList::allocbuf (rhs.maximum_); - + TAO_Unbounded_Base_Sequence::operator= (rhs); - + Dynamic::Parameter *tmp1 = ACE_reinterpret_cast (Dynamic::Parameter *, this->buffer_); Dynamic::Parameter * const tmp2 = ACE_reinterpret_cast (Dynamic::Parameter * ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < this->length_; ++i) tmp1[i] = tmp2[i]; - + return *this; } - + // = Accessors. ACE_INLINE Dynamic::Parameter & Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i) @@ -315,7 +315,7 @@ Dynamic::Parameter_out::operator-> (void) Dynamic::Parameter* tmp = ACE_reinterpret_cast(Dynamic::Parameter*,this->buffer_); return tmp[i]; } - + ACE_INLINE const Dynamic::Parameter & Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::operator[] (CORBA::ULong i) const // operator [] @@ -324,9 +324,9 @@ Dynamic::Parameter_out::operator-> (void) Dynamic::Parameter * const tmp = ACE_reinterpret_cast (Dynamic::Parameter* ACE_CAST_CONST, this->buffer_); return tmp[i]; } - + // Implement the TAO_Base_Sequence methods (see Sequence.h) - + ACE_INLINE Dynamic::Parameter * Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (CORBA::Boolean orphan) { @@ -360,13 +360,13 @@ Dynamic::Parameter_out::operator-> (void) } return result; } - + ACE_INLINE const Dynamic::Parameter * Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::get_buffer (void) const { return ACE_reinterpret_cast(const Dynamic::Parameter * ACE_CAST_CONST, this->buffer_); } - + ACE_INLINE void Dynamic::_TAO_Unbounded_Sequence_Dynamic_ParameterList::replace (CORBA::ULong max, CORBA::ULong length, @@ -383,11 +383,11 @@ Dynamic::Parameter_out::operator-> (void) this->buffer_ = data; this->release_ = release; } - + #endif /* end #if !defined */ -#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ +#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ #if !defined (_DYNAMIC_PARAMETERLIST_CI_) #define _DYNAMIC_PARAMETERLIST_CI_ @@ -443,7 +443,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p) { ParameterList *deep_copy = new ParameterList (*p.ptr_); - + if (deep_copy != 0) { ParameterList *tmp = deep_copy; @@ -453,7 +453,7 @@ Dynamic::ParameterList_var::operator= (const ::Dynamic::ParameterList_var &p) } } } - + return *this; } @@ -469,27 +469,27 @@ Dynamic::ParameterList_var::operator-> (void) return this->ptr_; } -ACE_INLINE +ACE_INLINE Dynamic::ParameterList_var::operator const ::Dynamic::ParameterList &() const // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast +ACE_INLINE +Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast +ACE_INLINE +Dynamic::ParameterList_var::operator ::Dynamic::ParameterList &() const // cast { return *this->ptr_; } // variable-size types only ACE_INLINE -Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast +Dynamic::ParameterList_var::operator ::Dynamic::ParameterList *&() // cast { return this->ptr_; } @@ -518,7 +518,7 @@ Dynamic::ParameterList_var::inout (void) return *this->ptr_; } -// mapping for variable size +// mapping for variable size ACE_INLINE ::Dynamic::ParameterList *& Dynamic::ParameterList_var::out (void) { @@ -579,7 +579,7 @@ Dynamic::ParameterList_out::operator= (ParameterList *p) return *this; } -ACE_INLINE +ACE_INLINE Dynamic::ParameterList_out::operator ::Dynamic::ParameterList *&() // cast { return this->ptr_; @@ -608,7 +608,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) #if !defined (TAO_USE_SEQUENCE_TEMPLATES) - + #if !defined (__TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_) #define __TAO_UNBOUNDED_OBJECT_SEQUENCE_DYNAMIC_EXCEPTIONLIST_CI_ @@ -616,36 +616,36 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (CORBA::ULong nelems) { CORBA::TypeCode **buf = 0; - + ACE_NEW_RETURN (buf, CORBA::TypeCode*[nelems], 0); - + for (CORBA::ULong i = 0; i < nelems; i++) { buf[i] = CORBA::TypeCode::_nil (); } - + return buf; } - - ACE_INLINE void + + ACE_INLINE void Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::freebuf (CORBA::TypeCode **buffer) { if (buffer == 0) return; delete[] buffer; } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (void) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum) : TAO_Unbounded_Base_Sequence (maximum, _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (maximum)) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList (CORBA::ULong maximum, CORBA::ULong length, @@ -654,7 +654,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) : TAO_Unbounded_Base_Sequence (maximum, length, value, release) { } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList(const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs) : TAO_Unbounded_Base_Sequence (rhs) @@ -663,12 +663,12 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) { CORBA::TypeCode **tmp1 = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (this->maximum_); CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < rhs.length_; ++i) { tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]); } - + this->buffer_ = tmp1; } else @@ -676,17 +676,17 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) this->buffer_ = 0; } } - + ACE_INLINE Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList & Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator= (const _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList &rhs) { if (this == &rhs) return *this; - + if (this->release_) { CORBA::TypeCode **tmp = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_); - + for (CORBA::ULong i = 0; i < this->length_; ++i) { CORBA::release (tmp[i]); @@ -700,20 +700,20 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) } else this->buffer_ = _TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::allocbuf (rhs.maximum_); - + TAO_Unbounded_Base_Sequence::operator= (rhs); - + CORBA::TypeCode **tmp1 = ACE_reinterpret_cast (CORBA::TypeCode **, this->buffer_); CORBA::TypeCode ** const tmp2 = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, rhs.buffer_); - + for (CORBA::ULong i = 0; i < rhs.length_; ++i) { tmp1[i] = CORBA::TypeCode::_duplicate (tmp2[i]); } - + return *this; } - + ACE_INLINE TAO_Pseudo_Object_Manager<Dynamic::TypeCode,Dynamic::TypeCode_var> Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::operator[] (CORBA::ULong index) const // read-write accessor @@ -722,7 +722,7 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) CORBA::TypeCode ** const tmp = ACE_reinterpret_cast (CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_); return TAO_Pseudo_Object_Manager<Dynamic::TypeCode,Dynamic::TypeCode_var> (tmp + index, this->release_); } - + ACE_INLINE CORBA::TypeCode* * Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (CORBA::Boolean orphan) { @@ -756,18 +756,18 @@ Dynamic::ParameterList_out::operator[] (CORBA::ULong index) } return result; } - + ACE_INLINE const CORBA::TypeCode* * Dynamic::_TAO_Unbounded_Object_Sequence_Dynamic_ExceptionList::get_buffer (void) const { return ACE_reinterpret_cast(const CORBA::TypeCode ** ACE_CAST_CONST, this->buffer_); } - - + + #endif /* end #if !defined */ -#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ +#endif /* !TAO_USE_SEQUENCE_TEMPLATES */ #if !defined (_DYNAMIC_EXCEPTIONLIST_CI_) #define _DYNAMIC_EXCEPTIONLIST_CI_ @@ -823,7 +823,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p) { ExceptionList *deep_copy = new ExceptionList (*p.ptr_); - + if (deep_copy != 0) { ExceptionList *tmp = deep_copy; @@ -833,7 +833,7 @@ Dynamic::ExceptionList_var::operator= (const ::Dynamic::ExceptionList_var &p) } } } - + return *this; } @@ -849,27 +849,27 @@ Dynamic::ExceptionList_var::operator-> (void) return this->ptr_; } -ACE_INLINE +ACE_INLINE Dynamic::ExceptionList_var::operator const ::Dynamic::ExceptionList &() const // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast +ACE_INLINE +Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() // cast { return *this->ptr_; } -ACE_INLINE -Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast +ACE_INLINE +Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList &() const // cast { return *this->ptr_; } // variable-size types only ACE_INLINE -Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast +Dynamic::ExceptionList_var::operator ::Dynamic::ExceptionList *&() // cast { return this->ptr_; } @@ -892,7 +892,7 @@ Dynamic::ExceptionList_var::inout (void) return *this->ptr_; } -// mapping for variable size +// mapping for variable size ACE_INLINE ::Dynamic::ExceptionList *& Dynamic::ExceptionList_var::out (void) { @@ -953,7 +953,7 @@ Dynamic::ExceptionList_out::operator= (ExceptionList *p) return *this; } -ACE_INLINE +ACE_INLINE Dynamic::ExceptionList_out::operator ::Dynamic::ExceptionList *&() // cast { return this->ptr_; @@ -989,7 +989,7 @@ ACE_INLINE CORBA::Boolean operator<< (TAO_OutputCDR &strm, const Dynamic::Parame return 1; else return 0; - + } ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_tao_aggregate) @@ -1001,7 +1001,7 @@ ACE_INLINE CORBA::Boolean operator>> (TAO_InputCDR &strm, Dynamic::Parameter &_t return 1; else return 0; - + } @@ -1033,4 +1033,3 @@ CORBA::Boolean TAO_Export operator>> ( ); #endif /* _TAO_CDR_OP_Dynamic_ExceptionList_I_ */ - diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp index 5471cd4d1f3..75e6530afba 100644 --- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp +++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.cpp @@ -2,11 +2,14 @@ + #include "DII_Reply_Dispatcher.h" + ACE_RCSID(DynamicInterface, DII_Reply_Dispatcher, "$Id$") + #include "Request.h" #include "tao/Pluggable.h" #include "tao/Environment.h" @@ -20,8 +23,15 @@ TAO_DII_Deferred_Reply_Dispatcher::TAO_DII_Deferred_Reply_Dispatcher ( const CORBA::Request_ptr req, TAO_ORB_Core *orb_core) : TAO_Asynch_Reply_Dispatcher_Base (orb_core), - reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - 0, + db_ (sizeof buf_, + ACE_Message_Block::MB_DATA, + this->buf_, + orb_core->message_block_buffer_allocator (), + orb_core->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + orb_core->message_block_dblock_allocator ()), + reply_cdr_ (&db_, + ACE_Message_Block::DONT_DELETE, TAO_ENCAP_BYTE_ORDER, TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR, @@ -44,8 +54,11 @@ TAO_DII_Deferred_Reply_Dispatcher::dispatch_reply ( { this->reply_status_ = params.reply_status_; - // Steal the buffer so that no copying is done. - this->reply_cdr_.steal_from (params.input_cdr_); + // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); + + ACE_UNUSED_ARG (db); // Steal the buffer, that way we don't do any unnecesary copies of // this data. diff --git a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h index 2e011729046..4bc0226620a 100644 --- a/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h +++ b/TAO/tao/DynamicInterface/DII_Reply_Dispatcher.h @@ -53,13 +53,18 @@ public: private: + /// The buffer that is used to initialise the data block + char buf_[ACE_CDR::DEFAULT_BUFSIZE]; + + /// datablock that is created on teh stack to initialise the CDR + /// stream underneath. + ACE_Data_Block db_; + + /// CDR stream for reading the input. TAO_InputCDR reply_cdr_; - // CDR stream for reading the input. - // @@ Carlos : message_state should go away. All we need is the reply - // cdr. Is that right? (Alex). + /// Where the reply needs to go. const CORBA::Request_ptr req_; - // Where the reply needs to go. }; #include "ace/post.h" diff --git a/TAO/tao/Exception.cpp b/TAO/tao/Exception.cpp index 9d9f12fe3e9..d33b1ebc4b8 100644 --- a/TAO/tao/Exception.cpp +++ b/TAO/tao/Exception.cpp @@ -1,5 +1,6 @@ // $Id$ + // THREADING NOTE: calling thread handles mutual exclusion policy // on all of these data structures. @@ -26,6 +27,7 @@ ACE_RCSID (TAO, Exception, "$Id$") + // Static initializers. ACE_Allocator *TAO_Exceptions::global_allocator_; diff --git a/TAO/tao/Exclusive_TMS.cpp b/TAO/tao/Exclusive_TMS.cpp index 3645be52122..c0c1e3d8755 100644 --- a/TAO/tao/Exclusive_TMS.cpp +++ b/TAO/tao/Exclusive_TMS.cpp @@ -43,10 +43,6 @@ TAO_Exclusive_TMS::bind_dispatcher (CORBA::ULong request_id, this->request_id_ = request_id; this->rd_ = rd; - // If there was a previous reply, cleanup its state first. - // if (this->message_state_.message_size != 0) - // this->message_state_.reset (0); - return TAO_Transport_Mux_Strategy::bind_dispatcher (request_id, rd); } @@ -87,32 +83,6 @@ TAO_Exclusive_TMS::dispatch_reply (TAO_Pluggable_Reply_Params ¶ms) return rd->dispatch_reply (params); } -/*TAO_GIOP_Message_State * -TAO_Exclusive_TMS::get_message_state (void) -{ - if (this->rd_ != 0) - { - TAO_GIOP_Message_State* rd_message_state = this->rd_->message_state (); - if (rd_message_state == 0) - { - // The Reply Dispatcher does not have one (the Asynch guys - // don't) so go ahead and pass yours. - return &this->message_state_; - } - // @@ TODO: it would seem like the "Right Thing"[tm] to do here - // is to return rd_message_state, but when Michael changed this - // stuff originally he left that out. I hesitate to make the - // change on this revision, too many changes at the same time. - } - return &this->message_state_; -} - -void -TAO_Exclusive_TMS::destroy_message_state (TAO_GIOP_Message_State *) -{ -} -*/ - int TAO_Exclusive_TMS::idle_after_send (void) { diff --git a/TAO/tao/Exclusive_TMS.h b/TAO/tao/Exclusive_TMS.h index ef22858e9b2..6b5114adcf2 100644 --- a/TAO/tao/Exclusive_TMS.h +++ b/TAO/tao/Exclusive_TMS.h @@ -53,10 +53,6 @@ public: virtual int dispatch_reply (TAO_Pluggable_Reply_Params ¶ms); - // @@ Commented for the time being, let the commented line stay for - // sometime - Bala - // virtual TAO_GIOP_Message_State *get_message_state (void); - // virtual void destroy_message_state (TAO_GIOP_Message_State *); virtual int idle_after_send (void); virtual int idle_after_reply (void); virtual void connection_closed (void); diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp index 9ffb54cc572..10417e4d38b 100644 --- a/TAO/tao/GIOP_Message_Base.cpp +++ b/TAO/tao/GIOP_Message_Base.cpp @@ -17,37 +17,21 @@ ACE_RCSID (tao, GIOP_Message_Base, "$Id$") + TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, - size_t input_cdr_size) - : message_handler_ (orb_core, - this, - input_cdr_size), - output_ (0), + size_t /*input_cdr_size*/) + : orb_core_ (orb_core), + message_state_ (orb_core, + this), generator_parser_ (0) { -#if defined(ACE_HAS_PURIFY) - (void) ACE_OS::memset (this->repbuf_, - '\0', - sizeof this->repbuf_); -#endif /* ACE_HAS_PURIFY */ - ACE_NEW (this->output_, - TAO_OutputCDR (this->repbuf_, - sizeof this->repbuf_, - TAO_ENCAP_BYTE_ORDER, - orb_core->message_block_buffer_allocator (), - orb_core->message_block_dblock_allocator (), - orb_core->message_block_msgblock_allocator (), - orb_core->orb_params ()->cdr_memcpy_tradeoff (), - TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR, - orb_core->to_iso8859 (), - orb_core->to_unicode ())); + } TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base (void) { - delete this->output_; + } @@ -62,10 +46,9 @@ TAO_GIOP_Message_Base::init (CORBA::Octet major, void -TAO_GIOP_Message_Base::reset (int reset_flag) +TAO_GIOP_Message_Base::reset (void) { - // Reset the message state - this->message_handler_.reset (reset_flag); + // no-op } int @@ -184,44 +167,11 @@ TAO_GIOP_Message_Base::generate_reply_header ( int -TAO_GIOP_Message_Base::read_message (TAO_Transport *transport, +TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/, int /*block */, ACE_Time_Value * /*max_wait_time*/) { - // Call the handler to read and do a simple parse of the header of - // the message. - int retval = - this->message_handler_.read_messages (transport); - - if (retval < 1) - return retval; - - retval = this->message_handler_.parse_message_header (); - - - // Error in the message that was received - if (retval == -1) - return -1; - // If -2, we want the reactor to call us back, so return 1 - else if (retval == -2) - return 1; - - if (retval != 0) - { - // Get the message state - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); - - // Set the state internally for parsing and generating messages - this->set_state (state.giop_version.major, - state.giop_version.minor); - } - - // We return 2, it is ugly. But the reactor semantics has made us to - // limp :( - return 2; - - + return 0; } int @@ -284,9 +234,12 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream) } TAO_Pluggable_Message_Type -TAO_GIOP_Message_Base::message_type (void) +TAO_GIOP_Message_Base::message_type ( + TAO_GIOP_Message_State &msg_state) { - switch (this->message_handler_.message_state ().message_type) + // Convert to the right type of Pluggable Messaging message type. + + switch (msg_state.message_type_) { case TAO_GIOP_REQUEST: case TAO_GIOP_LOCATEREQUEST: @@ -313,43 +266,264 @@ TAO_GIOP_Message_Base::message_type (void) } int +TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming) +{ + + if (this->message_state_.parse_message_header (incoming) == -1) + { + return -1; + } + + return 0; +} + +ssize_t +TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming) +{ + // Actual message size including the header.. + CORBA::ULong msg_size = + this->message_state_.message_size (); + + size_t len = incoming.length (); + + if (len > msg_size) + { + return -1; + } + else if (len == msg_size) + return 0; + + return msg_size - len; +} + + +int +TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) +{ + TAO_GIOP_Message_State state (this->orb_core_, + this); + + if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN) + { + if (incoming.length () > 0) + { + // Make a node which has a message block of the size of + // MESSAGE_HEADER_LEN. + qd = + this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN); + + qd->msg_block_->copy (incoming.rd_ptr (), + incoming.length ()); + qd->missing_data_ = -1; + } + return 0; + } + + if (state.parse_message_header (incoming) == -1) + { + return -1; + } + + size_t copying_len = state.message_size (); + + qd = this->make_queued_data (copying_len); + + if (copying_len > incoming.length ()) + { + qd->missing_data_ = + copying_len - incoming.length (); + + copying_len = incoming.length (); + } + + qd->msg_block_->copy (incoming.rd_ptr (), + copying_len); + + incoming.rd_ptr (copying_len); + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->msg_type_ = this->message_type (state); + return 1; +} + +int +TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) +{ + // Look to see whether we had atleast parsed the GIOP header ... + if (qd->missing_data_ == -1) + { + // The data length that has been stuck in there during the last + // read .... + size_t len = + qd->msg_block_->length (); + + // We know that we would have space for + // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data + // from the <incoming> into the message block in <qd> + qd->msg_block_->copy (incoming.rd_ptr (), + TAO_GIOP_MESSAGE_HEADER_LEN - len); + + // Move the rd_ptr () in the incoming message block.. + incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len); + + TAO_GIOP_Message_State state (this->orb_core_, + this); + + // Parse the message header now... + if (state.parse_message_header (*qd->msg_block_) == -1) + return -1; + + // Now grow the message block so that we can copy the rest of + // the data... + ACE_CDR::grow (qd->msg_block_, + state.message_size ()); + + // Copy the pay load.. + + // Calculate the bytes that needs to be copied in the queue... + size_t copy_len = + state.message_size () - TAO_GIOP_MESSAGE_HEADER_LEN; + + // If teh data that needs to be copied is more than that is + // available to us .. + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + else + { + qd->missing_data_ = 0; + } + + // ..now we are set to copy the right amount of data to the + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + incoming.rd_ptr (copy_len); + + // Get the other details... + qd->byte_order_ = state.byte_order_; + qd->major_version_ = state.giop_version_.major; + qd->minor_version_ = state.giop_version_.minor; + qd->msg_type_ = this->message_type (state); + } + else + { + // @@todo: Need to abstract this out to a seperate method... + size_t copy_len = qd->missing_data_; + + if (copy_len > incoming.length ()) + { + // Calculate the missing data.. + qd->missing_data_ = + copy_len - incoming.length (); + + // Set the actual possible copy_len that is available... + copy_len = incoming.length (); + } + + // Copy the right amount of data in to the node... + // node.. + qd->msg_block_->copy (incoming.rd_ptr (), + copy_len); + + // Set the <rd_ptr> of the <incoming>.. + qd->msg_block_->rd_ptr (copy_len); + + } + + + return 0; +} + + +void +TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd) +{ + // Get the message information + qd->byte_order_ = + this->message_state_.byte_order_; + qd->major_version_ = + this->message_state_.giop_version_.major; + qd->minor_version_ = + this->message_state_.giop_version_.minor; + + qd->msg_type_= + this->message_type (this->message_state_); + + // Reset the message_state + this->message_state_.reset (); +} + +int TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core) + TAO_Queued_Data *qd) + { // Set the upcall thread + this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ()); + + // Set the state internally for parsing and generating messages + this->set_state (qd->major_version_, + qd->minor_version_); + + // A buffer that we will use to initialise the CDR stream + char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; - orb_core->lf_strategy ().set_upcall_thread (orb_core->leader_follower ()); +#if defined(ACE_HAS_PURIFY) + (void) ACE_OS::memset (repbuf, + '\0', + sizeof repbuf); +#endif /* ACE_HAS_PURIFY */ - // Reset the output CDR stream. - // @@@@Is it necessary here? - this->output_->reset (); + // Initialze an output CDR on the stack + TAO_OutputCDR output (repbuf, + sizeof repbuf, + TAO_ENCAP_BYTE_ORDER, + this->orb_core_->output_cdr_buffer_allocator (), + this->orb_core_->output_cdr_dblock_allocator (), + this->orb_core_->output_cdr_msgblock_allocator (), + this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (), + qd->major_version_, + qd->minor_version_, + this->orb_core_->to_iso8859 (), + this->orb_core_->to_unicode ()); // Get the read and write positions before we steal data. - size_t rd_pos = this->message_handler_.rd_pos (); - size_t wr_pos = this->message_handler_.wr_pos (); + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; + + this->dump_msg ("recv", + ACE_reinterpret_cast (u_char *, + qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); // Create a input CDR stream. // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is also done in the higher layers. - TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (), - 0, + + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), + ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, - this->message_handler_.message_state ().byte_order, - state.giop_version.major, - state.giop_version.minor, - orb_core); + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); - // Set giop version info for the outstream so that server replies - // in correct GIOP version - output_->set_version (state.giop_version.major, state.giop_version.minor); - - // Reset the message handler to receive upcalls if any - this->message_handler_.reset (0); // We know we have some request message. Check whether it is a // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action. @@ -358,20 +532,20 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, // the stream and never touch that again for anything. We basically // loose ownership of the data_block. - switch (this->message_handler_.message_state ().message_type) + switch (qd->msg_type_) { case TAO_GIOP_REQUEST: // Should be taken care by the state specific invocations. They // could raise an exception or write things in the output CDR // stream return this->process_request (transport, - orb_core, - input_cdr); + input_cdr, + output); case TAO_GIOP_LOCATEREQUEST: return this->process_locate_request (transport, - orb_core, - input_cdr); + input_cdr, + output); default: return -1; } @@ -379,31 +553,40 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport, int TAO_GIOP_Message_Base::process_reply_message ( - TAO_Pluggable_Reply_Params ¶ms - ) + TAO_Pluggable_Reply_Params ¶ms, + TAO_Queued_Data *qd) { + // Set the state internally for parsing and generating messages + this->set_state (qd->major_version_, + qd->minor_version_); + // Get the read and write positions before we steal data. - size_t rd_pos = this->message_handler_.rd_pos (); - size_t wr_pos = this->message_handler_.wr_pos (); + size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base (); + size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base (); + rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN; - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); + this->dump_msg ("recv", + ACE_reinterpret_cast (u_char *, + qd->msg_block_->rd_ptr ()), + qd->msg_block_->length ()); - // Create a input CDR stream. + + // Create a empty buffer on stack // NOTE: We use the same data block in which we read the message and // we pass it on to the higher layers of the ORB. So we dont to any // copies at all here. The same is alos done in the higher layers. - TAO_InputCDR input_cdr (this->message_handler_.steal_data_block (), - 0, + TAO_InputCDR input_cdr (qd->msg_block_->data_block (), + ACE_Message_Block::DONT_DELETE, rd_pos, wr_pos, - this->message_handler_.message_state ().byte_order, - state.giop_version.major, - state.giop_version.minor); + qd->byte_order_, + qd->major_version_, + qd->minor_version_, + this->orb_core_); // Reset the message state. Now, we are ready for the next nested // upcall if any. - this->message_handler_.reset (0); + // this->message_handler_.reset (0); // We know we have some reply message. Check whether it is a // GIOP_REPLY or GIOP_LOCATE_REPLY to take action. @@ -412,7 +595,7 @@ TAO_GIOP_Message_Base::process_reply_message ( // the stream and never touch that again for anything. We basically // loose ownership of the data_block. - switch (this->message_handler_.message_state ().message_type) + switch (qd->msg_type_) { case TAO_GIOP_REPLY: // Should be taken care by the state specific parsing @@ -500,16 +683,16 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type t, int TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &cdr) + TAO_InputCDR &cdr, + TAO_OutputCDR &output) { // This will extract the request header, set <response_required> // and <sync_with_server> as appropriate. TAO_ServerRequest request (this, cdr, - *this->output_, + output, transport, - orb_core); + this->orb_core_); CORBA::ULong request_id = 0; CORBA::Boolean response_required = 0; @@ -533,10 +716,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, CORBA::Object_var forward_to; // Do this before the reply is sent. - orb_core->adapter_registry ()->dispatch (request.object_key (), - request, - forward_to, - ACE_TRY_ENV); + this->orb_core_->adapter_registry ()->dispatch ( + request.object_key (), + request, + forward_to, + ACE_TRY_ENV); ACE_TRY_CHECK; if (!CORBA::is_nil (forward_to.in ())) @@ -551,12 +735,12 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, reply_params.service_context_notowned (&request.reply_service_info ()); // Make the GIOP header and Reply header - this->generate_reply_header (*this->output_, + this->generate_reply_header (output, reply_params); - *this->output_ << forward_to.in (); + output << forward_to.in (); - int result = transport->send_message (*this->output_); + int result = transport->send_message (output); if (result == -1) { if (TAO_debug_level > 0) @@ -580,7 +764,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, if (response_required) { result = this->send_reply_exception (transport, - orb_core, + this->orb_core_, request_id, &request.reply_service_info (), &ACE_ANY_EXCEPTION); @@ -638,7 +822,7 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, CORBA::COMPLETED_MAYBE); result = this->send_reply_exception (transport, - orb_core, + this->orb_core_, request_id, &request.reply_service_info (), &exception); @@ -679,13 +863,13 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport, int TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, - TAO_ORB_Core* orb_core, - TAO_InputCDR &input) + TAO_InputCDR &input, + TAO_OutputCDR &output) { // This will extract the request header, set <response_required> as // appropriate. TAO_GIOP_Locate_Request_Header locate_request (input, - orb_core); + this->orb_core_); TAO_GIOP_Locate_Status_Msg status_info; @@ -706,13 +890,6 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, CORBA::COMPLETED_NO)); } - // Execute a fake request to find out if the object is there or - // if the POA can activate it on demand... - char repbuf[ACE_CDR::DEFAULT_BUFSIZE]; - TAO_OutputCDR dummy_output (repbuf, - sizeof repbuf); - // This output CDR is not used! - TAO_ObjectKey tmp_key (locate_request.object_key ().length (), locate_request.object_key ().length (), locate_request.object_key ().get_buffer (), @@ -731,9 +908,9 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, deferred_reply, tmp_key, "_non_existent", - dummy_output, + output, transport, - orb_core, + this->orb_core_, parse_error); if (parse_error != 0) @@ -744,10 +921,11 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, CORBA::Object_var forward_to; - orb_core->adapter_registry ()->dispatch (server_request.object_key (), - server_request, - forward_to, - ACE_TRY_ENV); + this->orb_core_->adapter_registry ()->dispatch ( + server_request.object_key (), + server_request, + forward_to, + ACE_TRY_ENV); ACE_TRY_CHECK; if (!CORBA::is_nil (forward_to.in ())) @@ -809,27 +987,29 @@ TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport, return this->make_send_locate_reply (transport, locate_request, - status_info); + status_info, + output); } int TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport, TAO_GIOP_Locate_Request_Header &request, - TAO_GIOP_Locate_Status_Msg &status_info) + TAO_GIOP_Locate_Status_Msg &status_info, + TAO_OutputCDR &output) { // Note here we are making the Locate reply header which is *QUITE* // different from the reply header made by the make_reply () call.. // Make the GIOP message header this->write_protocol_header (TAO_GIOP_LOCATEREPLY, - *this->output_); + output); // This writes the header & body - this->generator_parser_->write_locate_reply_mesg (*this->output_, + this->generator_parser_->write_locate_reply_mesg (output, request.request_id (), status_info); // Send the message - int result = transport->send_message (*this->output_); + int result = transport->send_message (output); // Print out message if there is an error if (result == -1) @@ -883,7 +1063,8 @@ TAO_GIOP_Message_Base::send_error (TAO_Transport *transport) 0, ACE_Message_Block::DONT_DELETE, 0); - ACE_Message_Block message_block(&data_block); + ACE_Message_Block message_block(&data_block, + ACE_Message_Block::DONT_DELETE); message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); size_t bt; @@ -1179,23 +1360,36 @@ TAO_GIOP_Message_Base::is_ready_for_bidirectional (void) return this->generator_parser_->is_ready_for_bidirectional (); } -int -TAO_GIOP_Message_Base::more_messages (void) + +TAO_Queued_Data * +TAO_GIOP_Message_Base::make_queued_data (size_t sz) { - int retval = - this->message_handler_.is_message_ready (); + // Get a node for the queue.. + TAO_Queued_Data *qd = + TAO_Queued_Data::get_queued_data (); - if (retval <= 0) - return retval; + // Make a datablock for the size requested + something. The + // "something" is required because we are going to align the data + // block in the message block. During alignment we could loose some + // bytes. As we may not know how many bytes will be lost, we will + // allocate ACE_CDR::MAX_ALIGNMENT extra. + ACE_Data_Block *db = + this->orb_core_->data_block_for_message_block (sz + + ACE_CDR::MAX_ALIGNMENT); + ACE_Allocator *alloc = + this->orb_core_->message_block_msgblock_allocator (); - // Get the message state - TAO_GIOP_Message_State &state = - this->message_handler_.message_state (); + ACE_Message_Block mb (db, + 0, + alloc); + + ACE_Message_Block *new_mb = mb.duplicate (); + + ACE_CDR::mb_align (new_mb); + + qd->msg_block_ = new_mb; - // Set the state internally for parsing and generating messages - this->set_state (state.giop_version.major, - state.giop_version.minor); - return retval; + return qd; } diff --git a/TAO/tao/GIOP_Message_Base.h b/TAO/tao/GIOP_Message_Base.h index ecb194b1148..c30113ec963 100644 --- a/TAO/tao/GIOP_Message_Base.h +++ b/TAO/tao/GIOP_Message_Base.h @@ -22,10 +22,12 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "tao/GIOP_Message_Generator_Parser_Impl.h" -#include "tao/GIOP_Message_Reactive_Handler.h" #include "tao/GIOP_Utils.h" +#include "tao/GIOP_Message_State.h" + class TAO_Pluggable_Reply_Params; +class TAO_Queued_Data; /** * @class TAO_GIOP_Message_Base @@ -41,7 +43,7 @@ class TAO_Pluggable_Reply_Params; class TAO_Export TAO_GIOP_Message_Base : public TAO_Pluggable_Messaging { public: - friend class TAO_GIOP_Message_Reactive_Handler; + // friend class TAO_GIOP_Message_Reactive_Handler; /// Constructor TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core, @@ -56,7 +58,7 @@ public: CORBA::Octet minor); /// Reset the messaging the object - virtual void reset (int reset_flag = 1); + virtual void reset (void); /// Write the RequestHeader in to the <cdr> stream. The underlying /// implementation of the mesaging should do the right thing. @@ -91,27 +93,40 @@ public: /// the message. virtual int format_message (TAO_OutputCDR &cdr); + /// Parse the incoming messages.. + virtual int parse_incoming_messages (ACE_Message_Block &message_block); - /// Get the message type. The return value would be one of the - /// following: - /// TAO_PLUGGABLE_MESSAGE_REQUEST, - /// TAO_PLUGGABLE_MESSAGE_REPLY, - /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, - /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. - virtual TAO_Pluggable_Message_Type message_type (void); + /// Calculate the amount of data that is missing in the <incoming> + /// message block. + virtual ssize_t missing_data (ACE_Message_Block &message_block); + /* Extract the details of the next message from the <incoming> + * through <qd>. Returns 1 if there are more messages and returns a + * 0 if there are no more messages in <incoming>. + */ + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd); + /// Check whether the node <qd> needs consolidation from <incoming> + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming); + + /// Get the details of the message parsed through the <qd>. + virtual void get_message_data (TAO_Queued_Data *qd); /// Process the request message that we have received on the /// connection virtual int process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core); + TAO_Queued_Data *qd); + /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( - TAO_Pluggable_Reply_Params &reply_info - ); + TAO_Pluggable_Reply_Params &reply_info, + TAO_Queued_Data *qd); + + /// Generate a reply message with the exception <ex>. virtual int generate_exception_reply ( @@ -124,13 +139,13 @@ protected: /// Processes the <GIOP_REQUEST> messages int process_request (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input); + TAO_InputCDR &input, + TAO_OutputCDR &output); /// Processes the <GIOP_LOCATE_REQUEST> messages int process_locate_request (TAO_Transport *transport, - TAO_ORB_Core *orb_core, - TAO_InputCDR &input); + TAO_InputCDR &input, + TAO_OutputCDR &output); /// Set the state void set_state (CORBA::Octet major, @@ -141,6 +156,14 @@ protected: const u_char *ptr, size_t len); + /// Get the message type. The return value would be one of the + /// following: + /// TAO_PLUGGABLE_MESSAGE_REQUEST, + /// TAO_PLUGGABLE_MESSAGE_REPLY, + /// TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION, + /// TAO_PLUGGABLE_MESSAGE_MESSAGE_ERROR. + TAO_Pluggable_Message_Type message_type (TAO_GIOP_Message_State &state); + private: /// Writes the GIOP header in to <msg> @@ -156,7 +179,8 @@ private: /// unmanageable difference comes let them be implemented here. int make_send_locate_reply (TAO_Transport *transport, TAO_GIOP_Locate_Request_Header &request, - TAO_GIOP_Locate_Status_Msg &status); + TAO_GIOP_Locate_Status_Msg &status, + TAO_OutputCDR &output); /// Send error messages int send_error (TAO_Transport *transport); @@ -184,39 +208,26 @@ private: /// request/response? virtual int is_ready_for_bidirectional (void); - /// Are there any more messages that needs processing - virtual int more_messages (void); + /// Creates a new node for the queue with a message block in the + /// node of size <sz>.. + TAO_Queued_Data *make_queued_data (size_t sz); private: + /// Cached ORB_Core pointer... + TAO_ORB_Core *orb_core_; + /// Thr message handler object that does reading and parsing of the /// incoming messages - TAO_GIOP_Message_Reactive_Handler message_handler_; - - /// Output CDR - TAO_OutputCDR *output_; - - /// Allocators for the output CDR that we hold. As we cannot rely on - /// the resources from ORB Core we reserve our own resources. The - /// reason that we cannot believe the ORB core is that, for a - /// multi-threaded servers it dishes out resources cached in - /// TSS. This would be dangerous as TSS gets destroyed before we - /// would. So we have our own memory that we can rely on. - /// Implementations of GIOP that we have - ACE_Allocator *cdr_buffer_alloc_; - ACE_Allocator *cdr_dblock_alloc_; - ACE_Allocator *cdr_msgblock_alloc_; - - /// A buffer that we will use to initialise the CDR stream - char repbuf_[ACE_CDR::DEFAULT_BUFSIZE]; + TAO_GIOP_Message_State message_state_; /// All the implementations of GIOP message generator and parsers TAO_GIOP_Message_Generator_Parser_Impl tao_giop_impl_; protected: - /// The generator and parser state. TAO_GIOP_Message_Generator_Parser *generator_parser_; + }; diff --git a/TAO/tao/GIOP_Message_Base.i b/TAO/tao/GIOP_Message_Base.i index 45b18282a10..a589447a413 100644 --- a/TAO/tao/GIOP_Message_Base.i +++ b/TAO/tao/GIOP_Message_Base.i @@ -4,41 +4,3 @@ // // GIOP_Message_Base // -# if 0 -ACE_INLINE const size_t -TAO_GIOP_Message_Base::header_len (void) -{ - return TAO_GIOP_MESSAGE_HEADER_LEN; -} - -ACE_INLINE const size_t -TAO_GIOP_Message_Base::message_size_offset (void) -{ - return TAO_GIOP_MESSAGE_SIZE_OFFSET; -} - -ACE_INLINE const size_t -TAO_GIOP_Message_Base::major_version_offset (void) -{ - return TAO_GIOP_VERSION_MAJOR_OFFSET; -} - -ACE_INLINE const size_t -TAO_GIOP_Message_Base::minor_version_offset (void) -{ - return TAO_GIOP_VERSION_MINOR_OFFSET; -} - -ACE_INLINE const size_t -TAO_GIOP_Message_Base::flags_offset (void) -{ - return TAO_GIOP_MESSAGE_FLAGS_OFFSET; -} - -ACE_INLINE const size_t -TAO_GIOP_Message_Base::message_type_offset (void) -{ - return TAO_GIOP_MESSAGE_TYPE_OFFSET; -} - -#endif /*if 0*/ diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.cpp b/TAO/tao/GIOP_Message_Reactive_Handler.cpp index a8af560081a..da22b2e93ab 100644 --- a/TAO/tao/GIOP_Message_Reactive_Handler.cpp +++ b/TAO/tao/GIOP_Message_Reactive_Handler.cpp @@ -8,393 +8,95 @@ #include "tao/GIOP_Message_Base.h" #include "Transport.h" +#if 0 #if !defined (__ACE_INLINE__) # include "tao/GIOP_Message_Reactive_Handler.inl" #endif /* __ACE_INLINE__ */ -ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$") +#endif /*if 0*/ -TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler (TAO_ORB_Core * orb_core, - TAO_GIOP_Message_Base *base, - size_t input_cdr_size) - : message_state_ (orb_core), - mesg_base_ (base), - message_status_ (TAO_GIOP_WAITING_FOR_HEADER), - message_size_ (input_cdr_size), - current_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)), - supp_buffer_ (orb_core->data_block_for_message_block (input_cdr_size)) +ACE_RCSID(tao, GIOP_Message_Reactive_Handler, "$Id$") +TAO_GIOP_Message_Reactive_Handler::TAO_GIOP_Message_Reactive_Handler ( + TAO_ORB_Core * /*orb_core*/, + TAO_GIOP_Message_Base * /*base*/) { - // NOTE: The message blocks here use a locked allocator which is not - // from the TSS even if there is one. We are getting the allocators - // from the global memory. We shouldn't be using the TSS stuff for - // the following reason - // (a) The connection handlers are per-connection and not - // per-thread. - // (b) The order of cleaning is important if we use allocators from - // TSS. The TSS goes away when the threads go away. But the - // connection handlers go away only when the ORB decides to shut - // it down. - ACE_CDR::mb_align (&this->current_buffer_); - - // Calculate the effective message after alignment - this->message_size_ -= this->rd_pos (); -} - - - -int -TAO_GIOP_Message_Reactive_Handler::read_messages (TAO_Transport *transport) -{ - // Read the message from the transport. The size of the message read - // is the maximum size of the buffer that we have less the amount of - // data that has already been read in to the buffer. - ssize_t n = transport->recv (this->current_buffer_.wr_ptr (), - this->current_buffer_.space ()); - - if (n == -1) - { - if (errno == EWOULDBLOCK) - return 0; - - return -1; - } - // @@ What are the other error handling here?? - else if (n == 0) - { - return -1; - } - - if (TAO_debug_level == 5) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - GIOP_Message_Reactive_Handler::read_messages" - " received %d bytes\n", - n)); - - size_t len; - for (size_t offset = 0; offset < size_t(n); offset += len) - { - len = n - offset; - if (len > 512) - len = 512; - ACE_HEX_DUMP ((LM_DEBUG, - this->current_buffer_.wr_ptr () + offset, - len, - "TAO (%P|%t) - read_messages ")); - } - ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - received %d bytes \n", n)); - } - - - // Now we have a succesful read. First adjust the write pointer - this->current_buffer_.wr_ptr (n); - - // Success - return 1; } -int -TAO_GIOP_Message_Reactive_Handler::parse_message_header (void) -{ - // Check what message are we waiting for and take suitable action - if (this->message_status_ == TAO_GIOP_WAITING_FOR_HEADER) - { - size_t len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - - if (len > TAO_GIOP_MESSAGE_HEADER_LEN) - { - // Parse the GIOP header - if (this->parse_message_header_i (buf) == -1) - - return -1; - - int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN, - len); - - // Set the pointer read pointer position in the - // <current_buffer_> - size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN; - - if (retval) - { - // We had a fragment header, so the position should be - // beyond that - pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; - } - - this->current_buffer_.rd_ptr (pos); - buf = this->current_buffer_.rd_ptr (); - - // The GIOP header has been parsed. Set the status to wait for - // payload - this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; - - return 1; - } - - // We dont have sufficient information to decipher the GIOP - // header. Make sure that the reactor calls us back. - return -2; - } - - // The last read just "read" left-over messages - return 0; -} int -TAO_GIOP_Message_Reactive_Handler::is_message_ready (void) +TAO_GIOP_Message_Reactive_Handler::parse_message_header (ACE_Message_Block &incoming, + TAO_GIOP_Message_State &state) { - if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) - { - size_t len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - - // Set the buf pointer to the start of the GIOP header - buf -= TAO_GIOP_MESSAGE_HEADER_LEN; - - // Dump the incoming message . It will be dumped only if the - // debug level is greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + TAO_GIOP_MESSAGE_HEADER_LEN); - if (len == this->message_state_.message_size) - { - // If the buffer length is equal to the size of the payload we - // have exactly one message. - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - - // Check whether we have received only the first part of the - // fragment. - return this->message_state_.is_complete (this->current_buffer_); - } - else if (len > this->message_state_.message_size) - { - // If the length is greater we have received some X messages - // and a part of X + 1 messages (probably) with X varying - // from 1 to N. - this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; - - // Clone the data that we read in. - this->supp_buffer_.data_block ( - this->current_buffer_.data_block ()->clone ()); - - // Set the read and write pointer for the supplementary - // buffer. - size_t rd_pos = this->rd_pos (); - this->supp_buffer_.rd_ptr (rd_pos + - this->message_state_.message_size); - this->supp_buffer_.wr_ptr (this->wr_pos ()); - - // Reset the current buffer - this->current_buffer_.reset (); - - // Set the read and write pointers again for the current - // buffer. We change the write pointer settings as we would - // like to process a single message. - this->current_buffer_.rd_ptr (rd_pos); - this->current_buffer_.wr_ptr (rd_pos + - this->message_state_.message_size); - - return this->message_state_.is_complete (this->current_buffer_); - } - } - else if (this->message_status_ == TAO_GIOP_MULTIPLE_MESSAGES) + // @@Bala: Need to make a check whether we are waiting for the + // header... + if (incoming.length () > TAO_GIOP_MESSAGE_HEADER_LEN) { - size_t len = this->supp_buffer_.length (); - - if (len > TAO_GIOP_MESSAGE_HEADER_LEN) - { - // @@ What about fragment headers??? - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - len = this->current_buffer_.length (); - char *buf = this->current_buffer_.rd_ptr (); - - if (this->parse_message_header_i (buf) == -1) - - return -1; - - // Set the pointer read pointer position in the - // <current_buffer_> - this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - - // The GIOP header has been parsed. Set the status to wait for - // payload - this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; - - return this->get_message (); - } - else - { - // We have smaller than the header size left here. We - // just copy the rest of the stuff and reset things so that - // we can read the rest of the stuff from the socket. - this->current_buffer_.copy ( - this->supp_buffer_.rd_ptr (), - len); - - // Reset the supp buffer now - this->supp_buffer_.reset (); - - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - } - + // Parse the GIOP header + if (this->parse_message_header_i (incoming, state) == -1) + return -1; } - // Just send us back to the reactor so that we can for more data to - // come in . return 0; } -ACE_Data_Block * -TAO_GIOP_Message_Reactive_Handler::steal_data_block (void) -{ - ACE_Data_Block *db = - this->current_buffer_.data_block ()->clone_nocopy (); - - ACE_Data_Block *old_db = - this->current_buffer_.replace_data_block (db); - - ACE_CDR::mb_align (&this->current_buffer_); - - return old_db; -} - - -void -TAO_GIOP_Message_Reactive_Handler::reset (int reset_flag) -{ - // Reset the contents of the message state - this->message_state_.reset (reset_flag); - - // Reset the current buffer - this->current_buffer_.reset (); - - ACE_CDR::mb_align (&this->current_buffer_); - - if (this->message_status_ != TAO_GIOP_MULTIPLE_MESSAGES) - { - this->supp_buffer_.reset (); - ACE_CDR::mb_align (&this->supp_buffer_); - } - -} - - int -TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (char *buf) +TAO_GIOP_Message_Reactive_Handler::parse_message_header_i (ACE_Message_Block &incoming, + TAO_GIOP_Message_State &state) { if (TAO_debug_level > 8) { ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n")); } - // Check whether we have a GIOP Message in the first place - if (this->parse_magic_bytes (buf) == -1) - return -1; + // Grab the rd_ptr_ from the message block.. + char *buf = incoming.rd_ptr (); - // Let us be specific that this is for 1.0 - if (this->message_state_.giop_version.minor == 0 && - this->message_state_.giop_version.major == 1) + // Parse the magic bytes first + if (this->parse_magic_bytes (buf) == -1) { - this->message_state_.byte_order = - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; - - if (this->message_state_.byte_order != 0 && - this->message_state_.byte_order != 1) - { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") - ACE_TEXT (" for version <1.0>\n"), - this->message_state_.byte_order)); - return -1; - } + return -1; } - else - { - // Read the byte ORDER - this->message_state_.byte_order = - (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); - // Read the fragment bit - this->message_state_.more_fragments = - (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); + // Get the version information + if (this->get_version_info (buf, state) == -1) + return -1; - if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) - { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") - ACE_TEXT (" for version <%d %d> \n"), - buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], - this->message_state_.giop_version.major, - this->message_state_.giop_version.minor)); - return -1; - } - } + // Get the byte order information... + if (this->get_byte_order_info (buf, state) == -1) + return -1; // Get the message type - this->message_state_.message_type = - buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - - // Get the payload size. If the payload size is greater than the - // length then set the length of the message block to that - // size. Move the rd_ptr to the end of the GIOP header - this->message_state_.message_size = this->get_payload_size (buf); + state.message_type = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; - // If the message_size or the payload_size is zero then something - // is fishy. So return an error. - if (this->message_state_.message_size == 0) - return -1; + // Get the payload size + this->get_payload_size (buf, state); - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) Parsed header = <%d,%d,%d,%d,%d>\n"), - this->message_state_.giop_version.major, - this->message_state_.giop_version.minor, - this->message_state_.byte_order, - this->message_state_.message_type, - this->message_state_.message_size)); - } + // Parse the + int retval = this->parse_fragment_header (buf + TAO_GIOP_MESSAGE_HEADER_LEN, + len); - return 1; -} + // Set the pointer read pointer position in the + // <current_buffer_> + size_t pos = TAO_GIOP_MESSAGE_HEADER_LEN; + if (retval) + { + // We had a fragment header, so the position should be + // beyond that + pos += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + } -int -TAO_GIOP_Message_Reactive_Handler::parse_fragment_header (char *buf, - size_t length) -{ - size_t len = - TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN; - - // By this point we are doubly sure that we have a more or less - // valid GIOP message with a valid major revision number. - if (this->message_state_.more_fragments && - this->message_state_.giop_version.minor == 2 && - length > len) - { - // Fragmented message in GIOP 1.2 should have a fragment header - // following the GIOP header. Grab the rd_ptr to get that - // info. - this->message_state_.request_id = this->read_ulong (buf); + this->current_buffer_.rd_ptr (pos); + buf = this->current_buffer_.rd_ptr (); - // As we parsed the header - return 1; + // The GIOP header has been parsed. Set the status to wait for + // payload + this->message_status_ = TAO_GIOP_WAITING_FOR_PAYLOAD; } + return 1; - return 0; -} int @@ -406,7 +108,7 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) && buf [2] == 0x4f // 'O' && buf [3] == 0x50)) // 'P' { - // For the present... + // For the present... if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) bad header, " @@ -418,12 +120,25 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) return -1; } + return 0; +} + +int +TAO_GIOP_Message_Reactive_Handler::get_version_info (char *buf, + TAO_GIOP_Message_State &state) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n")); + } + // We have a GIOP message on hand. Get its revision numbers CORBA::Octet incoming_major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; CORBA::Octet incoming_minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + // Check the revision information if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( incoming_major, incoming_minor) == 0) @@ -439,55 +154,81 @@ TAO_GIOP_Message_Reactive_Handler::parse_magic_bytes (char *buf) } // Set the version - this->message_state_.giop_version.minor = incoming_minor; - this->message_state_.giop_version.major = incoming_major; + state.giop_version.minor = incoming_minor; + state.giop_version.major = incoming_major; return 0; } +int +TAO_GIOP_Message_Reactive_Handler::get_byte_order_info (char *buf, + TAO_GIOP_Message_State &message_state) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n")); + } + + // Let us be specific that this is for 1.0 + if (message_state.giop_version.minor == 0 && + message_state.giop_version.major == 1) + { + message_state_.byte_order = + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; + + if (message_state.byte_order != 0 && + message_state.byte_order != 1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") + ACE_TEXT (" for version <1.0>\n"), + message_state.byte_order)); + return -1; + } + } + else + { + // Read the byte ORDER + message_state.byte_order = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); + + // Read the fragment bit + message_state.more_fragments = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); + + if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + message_state_.giop_version.major, + message_state_.giop_version.minor)); + return -1; + } + } + + return 0; +} CORBA::ULong -TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr) +TAO_GIOP_Message_Reactive_Handler::get_payload_size (char *rd_ptr, + TAO_GIOP_Message_State &message_state) { // Move the read pointer rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - CORBA::ULong x = this->read_ulong (rd_ptr); - - if ((x + TAO_GIOP_MESSAGE_HEADER_LEN) > this->message_size_) - { - if (ACE_CDR::grow (&this->current_buffer_, - x + TAO_GIOP_MESSAGE_HEADER_LEN) == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%P | %t) Unable to increase the size \n") - ACE_TEXT ("of the buffer \n"))); - return 0; - } - - // New message size is the size of the now larger buffer. - this->message_size_ = x + - TAO_GIOP_MESSAGE_HEADER_LEN + - ACE_CDR::MAX_ALIGNMENT; - } - - // Set the read pointer to the end of the GIOP message - // this->current_buffer_.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN); - return x; -} + CORBA::ULong x = 0; -CORBA::ULong -TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr) -{ size_t msg_size = 4; - char *buf = ACE_ptr_align_binary (ptr, + char *buf = ACE_ptr_align_binary (rd_ptr, msg_size); - CORBA::ULong x; #if !defined (ACE_DISABLE_SWAP_ON_READ) - if (!(this->message_state_.byte_order != ACE_CDR_BYTE_ORDER)) + if (!(state.byte_order != ACE_CDR_BYTE_ORDER)) { x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); } @@ -499,76 +240,5 @@ TAO_GIOP_Message_Reactive_Handler::read_ulong (const char *ptr) x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf); #endif /* ACE_DISABLE_SWAP_ON_READ */ - return x; -} - - - - -int -TAO_GIOP_Message_Reactive_Handler::get_message (void) -{ - if (this->message_status_ == TAO_GIOP_WAITING_FOR_PAYLOAD) - { - size_t len = this->supp_buffer_.length (); - char * buf = - this->current_buffer_.rd_ptr (); - - buf -= TAO_GIOP_MESSAGE_HEADER_LEN; - - if (len == this->message_state_.message_size) - { - // If the buffer length is equal to the size of the payload we - // have exactly one message. Check whether we have received - // only the first part of the fragment. - this->message_status_ = TAO_GIOP_WAITING_FOR_HEADER; - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - this->message_state_.message_size); - - // The message will be dumped only if the debug level is - // greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (this->message_state_.message_size); - return this->message_state_.is_complete (this->current_buffer_); - } - else if (len > this->message_state_.message_size) - { - // If the length is greater we have received some X messages - // and a part of X + 1 messages (probably) with X varying - // from 1 to N. - this->message_status_ = TAO_GIOP_MULTIPLE_MESSAGES; - - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - this->message_state_.message_size); - - // The message will be dumped only if the debug level is - // greater than 5 anyway. - this->mesg_base_->dump_msg ( - "Recv msg", - ACE_reinterpret_cast (u_char *, - buf), - len + - TAO_GIOP_MESSAGE_HEADER_LEN); - - this->supp_buffer_.rd_ptr (this->message_state_.message_size); - return this->message_state_.is_complete (this->current_buffer_); - } - else - { - // The remaining message in the supp buffer - this->current_buffer_.copy (this->supp_buffer_.rd_ptr (), - this->supp_buffer_.length ()); - - // Reset the supp buffer now - this->supp_buffer_.reset (); - } - } - - return 0; + message_state.message_size = x; } diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.h b/TAO/tao/GIOP_Message_Reactive_Handler.h index b38a74c27c5..16c0503851e 100644 --- a/TAO/tao/GIOP_Message_Reactive_Handler.h +++ b/TAO/tao/GIOP_Message_Reactive_Handler.h @@ -13,29 +13,19 @@ #ifndef TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H #define TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H #include "ace/pre.h" -#include "ace/Message_Block.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "tao/GIOP_Message_State.h" -class TAO_Transport; + class TAO_ORB_Core; class TAO_GIOP_Message_Base; +class TAO_GIOP_Message_State; +class ACE_Message_Block; -enum TAO_GIOP_Message_Status -{ - /// The buffer is waiting for the header of the message yet - TAO_GIOP_WAITING_FOR_HEADER = 0, - - /// The buffer is waiting for the payload to appear on the socket - TAO_GIOP_WAITING_FOR_PAYLOAD, - - /// The buffer has got multiple messages - TAO_GIOP_MULTIPLE_MESSAGES -}; /** * @class TAO_GIOP_Message_Reactive_Handler @@ -54,6 +44,9 @@ enum TAO_GIOP_Message_Status * reading the header and the payload seperately. */ + + +#if 0 class TAO_Export TAO_GIOP_Message_Reactive_Handler { public: @@ -79,7 +72,7 @@ public: /// - We have sufficient info for processing the header and we /// processed it succesfully. (return 1); /// - Any errors in processing will return a -1. - int parse_message_header (void); + int parse_message_header (ACE_Message_Block &message_block); /// Check whether we have atleast one complete message ready for /// processing. @@ -173,7 +166,11 @@ private: /// is then sent to the higher layers of the ORB. ACE_Message_Block supp_buffer_; }; +#if defined (__ACE_INLINE__) +# include "tao/GIOP_Message_Reactive_Handler.inl" +#endif /* __ACE_INLINE__ */ +#endif const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12; const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8; @@ -183,9 +180,7 @@ const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5; const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4; const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4; -#if defined (__ACE_INLINE__) -# include "tao/GIOP_Message_Reactive_Handler.inl" -#endif /* __ACE_INLINE__ */ + #include "ace/post.h" #endif /*TAO_GIOP_MESSAGE_REACTIVE_HANDLER_H*/ diff --git a/TAO/tao/GIOP_Message_Reactive_Handler.inl b/TAO/tao/GIOP_Message_Reactive_Handler.inl index 91211b34464..66b856377dc 100644 --- a/TAO/tao/GIOP_Message_Reactive_Handler.inl +++ b/TAO/tao/GIOP_Message_Reactive_Handler.inl @@ -3,6 +3,8 @@ + +#if 0 ACE_INLINE TAO_GIOP_Message_State & TAO_GIOP_Message_Reactive_Handler::message_state (void) { @@ -28,3 +30,5 @@ TAO_GIOP_Message_Reactive_Handler::wr_pos (void) const return this->current_buffer_.wr_ptr () - this->current_buffer_.base (); } + +#endif diff --git a/TAO/tao/GIOP_Message_State.cpp b/TAO/tao/GIOP_Message_State.cpp index c71c5a638e5..36ebdcd41ae 100644 --- a/TAO/tao/GIOP_Message_State.cpp +++ b/TAO/tao/GIOP_Message_State.cpp @@ -1,115 +1,270 @@ -// -*- C++ -*- - -//$Id$ +// $Id$ #include "tao/GIOP_Message_State.h" -#include "tao/GIOP_Utils.h" +#include "tao/GIOP_Message_Generator_Parser_Impl.h" #include "tao/ORB_Core.h" +#include "tao/Pluggable.h" +#include "tao/debug.h" +#include "tao/GIOP_Message_Base.h" +//#include "Transport.h" #if !defined (__ACE_INLINE__) -# include "tao/GIOP_Message_State.i" +# include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ + ACE_RCSID(tao, GIOP_Message_State, "$Id$") - TAO_GIOP_Message_State::TAO_GIOP_Message_State (TAO_ORB_Core* /*orb_core*/) - : byte_order (TAO_ENCAP_BYTE_ORDER), - message_type (TAO_GIOP_MESSAGERROR), - message_size (0), - request_id (0), - // Problem similar to GIOP_Message_handler.cpp - Bala - fragmented_messages (ACE_CDR::DEFAULT_BUFSIZE), - more_fragments (0) +TAO_GIOP_Message_State::TAO_GIOP_Message_State ( + TAO_ORB_Core * /*orb_core*/, + TAO_GIOP_Message_Base * /*base*/) + : giop_version_ (TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR), + byte_order_ (0), + message_type_ (0), + message_size_ (0), + request_id_ (0), + more_fragments_ (0), + missing_data_ (0) { - //giop_version.major = TAO_DEF_GIOP_MAJOR; - //giop_version.minor = TAO_DEF_GIOP_MINOR; } -TAO_GIOP_Message_State::~TAO_GIOP_Message_State (void) + +int +TAO_GIOP_Message_State::parse_message_header (ACE_Message_Block &incoming) { - // @@ Bala: this is not a very useful comment, is it? - //no-op + if (incoming.length () >= TAO_GIOP_MESSAGE_HEADER_LEN) + { + // Parse the GIOP header + if (this->parse_message_header_i (incoming) == -1) + return -1; + } + + return 0; +} + +int +TAO_GIOP_Message_State::parse_message_header_i (ACE_Message_Block &incoming) +{ + if (TAO_debug_level > 8) + { + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - parsing header\n")); + } + + // Grab the rd_ptr_ from the message block.. + char *buf = incoming.rd_ptr (); + + // Parse the magic bytes first + if (this->parse_magic_bytes (buf) == -1) + { + return -1; + } + + // Get the version information + if (this->get_version_info (buf) == -1) + return -1; + + // Get the byte order information... + if (this->get_byte_order_info (buf) == -1) + return -1; + + // Get the message type + this->message_type_ = buf[TAO_GIOP_MESSAGE_TYPE_OFFSET]; + + // Get the size of the message.. + this->get_payload_size (buf); + + if (this->message_size_ == 0) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Message with size 0 recd.. \n"))); + return -1; + } + + if (this->more_fragments_) + { + // Parse the + /*int retval = */ + this->parse_fragment_header (buf, + incoming.length ()); + } + + return 0; +} + + + + +int +TAO_GIOP_Message_State::parse_magic_bytes (char *buf) +{ + // The values are hard-coded to support non-ASCII platforms. + if (!(buf [0] == 0x47 // 'G' + && buf [1] == 0x49 // 'I' + && buf [2] == 0x4f // 'O' + && buf [3] == 0x50)) // 'P' + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) bad header, " + "magic word [%2.2x,%2.2x,%2.2x,%2.2x]\n"), + buf[0], + buf[1], + buf[2], + buf[3])); + return -1; + } + + return 0; } int -TAO_GIOP_Message_State::is_complete (ACE_Message_Block ¤t_buf) +TAO_GIOP_Message_State::get_version_info (char *buf) { - if (this->more_fragments) + if (TAO_debug_level > 8) { - if (this->fragmented_messages.length () == 0) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting version info.. \n")); + } + + // We have a GIOP message on hand. Get its revision numbers + CORBA::Octet incoming_major = + buf[TAO_GIOP_VERSION_MAJOR_OFFSET]; + CORBA::Octet incoming_minor = + buf[TAO_GIOP_VERSION_MINOR_OFFSET]; + + // Check the revision information + if (TAO_GIOP_Message_Generator_Parser_Impl::check_revision ( + incoming_major, + incoming_minor) == 0) + { + if (TAO_debug_level > 0) { - this->first_fragment_byte_order = this->byte_order; - this->first_fragment_giop_version = this->giop_version; - this->first_fragment_message_type = this->message_type; - // this->fragments_end = this->fragments_begin = current; - this->fragmented_messages.copy (current_buf.rd_ptr (), - current_buf.length ()); - - // Reset the buffer - current_buf.reset (); - - // Reset our state - this->reset (); - return 0; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t|%N|%l) bad version <%d.%d>\n"), + incoming_major, incoming_minor)); } - return this->append_fragment (current_buf); + return -1; } - if (this->fragmented_messages.length () != 0) + // Set the version + this->giop_version_.minor = incoming_minor; + this->giop_version_.major = incoming_major; + + return 0; +} + +int +TAO_GIOP_Message_State::get_byte_order_info (char *buf) +{ + if (TAO_debug_level > 8) { - // This is the last message, but we must defragment before - // sending - if (this->append_fragment (current_buf) == -1) - return -1; + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Getting byte order info.. \n")); + } - // Copy the entire message block into <current_buf> - current_buf.data_block (this->fragmented_messages.data_block ()->clone ()); + // Let us be specific that this is for 1.0 + if (this->giop_version_.minor == 0 && + this->giop_version_.major == 1) + { + this->byte_order_ = + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]; - this->fragmented_messages.reset (); + if (this->byte_order_ != 0 && + this->byte_order_ != 1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid byte order <%d>") + ACE_TEXT (" for version <1.0>\n"), + this->byte_order_)); + return -1; + } + } + else + { + // Read the byte ORDER + this->byte_order_ = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x01); - this->byte_order = this->first_fragment_byte_order; - this->giop_version = this->first_fragment_giop_version; - this->message_type = this->first_fragment_message_type; + // Read the fragment bit + this->more_fragments_ = + (CORBA::Octet) (buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET]& 0x02); - // This message has no more fragments, and there where no fragments - // before it, just return. Notice that current_buf has the - // *right* contents + if ((buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & ~0x3) != 0) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) invalid flags for <%d>") + ACE_TEXT (" for version <%d %d> \n"), + buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET], + this->giop_version_.major, + this->giop_version_.minor)); + return -1; + } } + return 0; +} + +void +TAO_GIOP_Message_State::get_payload_size (char *rd_ptr) +{ + // Move the read pointer + rd_ptr += TAO_GIOP_MESSAGE_SIZE_OFFSET; - return 1; + this->message_size_ = this->read_ulong (rd_ptr); } + + int -TAO_GIOP_Message_State::append_fragment (ACE_Message_Block& current) +TAO_GIOP_Message_State::parse_fragment_header (char *buf, + size_t length) { - if (this->first_fragment_byte_order != this->byte_order - || this->first_fragment_giop_version.major != this->giop_version.major - || this->first_fragment_giop_version.minor != this->giop_version.minor) + size_t len = + TAO_GIOP_MESSAGE_FRAGMENT_HEADER + TAO_GIOP_MESSAGE_HEADER_LEN; + + buf += TAO_GIOP_MESSAGE_HEADER_LEN; + + // By this point we are doubly sure that we have a more or less + // valid GIOP message with a valid major revision number. + if (this->giop_version_.minor == 2 && length > len) { - // Yes, print it out in all debug levels!. This is an error by - // CORBA 2.4 spec - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) incompatible fragments:\n") - ACE_TEXT (" Different GIOP versions or byte order\n"))); - this->reset (); - return -1; + // Fragmented message in GIOP 1.2 should have a fragment header + // following the GIOP header. Grab the rd_ptr to get that + // info. + this->request_id_ = this->read_ulong (buf); + + // As we parsed the header + return 1; } - size_t req_size = - this->fragmented_messages.size () + current.length (); + return 0; +} - this->fragmented_messages.size (req_size); +CORBA::ULong +TAO_GIOP_Message_State::read_ulong (char *rd_ptr) +{ + CORBA::ULong x = 0; - // Copy the message - this->fragmented_messages.copy (current.rd_ptr (), - current.length ()); + size_t msg_size = 4; - current.reset (); + char *buf = ACE_ptr_align_binary (rd_ptr, + msg_size); - // Reset our state - this->reset (); +#if !defined (ACE_DISABLE_SWAP_ON_READ) + if (!(this->byte_order_ != ACE_CDR_BYTE_ORDER)) + { + x = *ACE_reinterpret_cast (ACE_CDR::ULong*, buf); + } + else + { + ACE_CDR::swap_4 (buf, ACE_reinterpret_cast (char*, &x)); + } +#else + x = *ACE_reinterpret_cast(ACE_CDR::ULong*, buf); +#endif /* ACE_DISABLE_SWAP_ON_READ */ - return 0; + return x; } diff --git a/TAO/tao/GIOP_Message_State.h b/TAO/tao/GIOP_Message_State.h index a71d53869ac..3f262373f54 100644 --- a/TAO/tao/GIOP_Message_State.h +++ b/TAO/tao/GIOP_Message_State.h @@ -11,25 +11,22 @@ * * @author Chris Cleeland <cleeland@cs.wustl.edu> * @author Carlos O' Ryan <coryan@uci.edu> + * @author modified by Balachandran Natarajan <bala@cs.wustl.edu> */ //============================================================================= - - #ifndef TAO_GIOP_MESSAGE_STATE_H #define TAO_GIOP_MESSAGE_STATE_H #include "ace/pre.h" -#include "tao/TAO_Export.h" +#include "tao/GIOP_Message_Version.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#include "tao/corbafwd.h" -#include "tao/CDR.h" -#include "tao/GIOP_Message_Version.h" class TAO_ORB_Core; +class TAO_GIOP_Message_Base; /** @@ -37,49 +34,81 @@ class TAO_ORB_Core; * * @brief Generic definitions for Message States. * - * This represents the state of the incoming GIOP message - * As the ORB processes incoming messages it needs to keep track of - * how much of the message has been read, if there are any - * fragments following this message etc. + * This helps to establish the state of the incoming messages. */ + class TAO_Export TAO_GIOP_Message_State { - public: + /// Ctor - TAO_GIOP_Message_State (TAO_ORB_Core *orb_core); + TAO_GIOP_Message_State (TAO_ORB_Core *orb_core, + TAO_GIOP_Message_Base *base); + + /// Parse the message header. + int parse_message_header (ACE_Message_Block &incoming); + + /// Return the message size + CORBA::ULong message_size (void) const; + + /// Return the message size + CORBA::ULong payload_size (void) const; + + /// Return the byte order information + CORBA::Octet byte_order (void) const; - /// Dtor - ~TAO_GIOP_Message_State (void); + /// Reset the state.. + void reset (void); - ///Reset the message header state and prepare it to receive the next - /// event. - void reset (int reset_contents = 1); +private: + + friend class TAO_GIOP_Message_Base; + + /// Parse the message header. + int parse_message_header_i (ACE_Message_Block &incoming); - /// Has the header been received? - CORBA::Boolean header_received (void) const; + /// Checks for the magic word 'GIOP' in the start of the incoing + /// stream + int parse_magic_bytes (char *buf); - /// Check if the current message is complete, adjusting the fragments - /// if required... - int is_complete (ACE_Message_Block ¤t_buf); + /// Extracts the version information from the incoming + /// stream. Performs a check for whether the version information is + /// right and sets the information in the <state> + int get_version_info (char *buf); - /// Did we get fragmented messages? - int message_fragmented (void); + /// Extracts the byte order information from the incoming + /// stream. Performs a check for whether the byte order information + /// right and sets the information in the <state> + int get_byte_order_info (char *buf); - /// Version info - TAO_GIOP_Message_Version giop_version; + /// Gets the size of the payload and set the size in the <state> + void get_payload_size (char *buf); + + /// Parses the GIOP FRAGMENT_HEADER information from the incoming + /// stream. + int parse_fragment_header (char *buf, + size_t length); + + /// Read the unsigned long from the buffer. The <buf> should just + /// point to the next 4 bytes data that represent the ULong + CORBA::ULong read_ulong (char *buf); + +private: + + // GIOP version information.. + TAO_GIOP_Message_Version giop_version_; /// 0 = big, 1 = little - CORBA::Octet byte_order; + CORBA::Octet byte_order_; /// MsgType above - CORBA::Octet message_type; + CORBA::Octet message_type_; /// in byte_order! - CORBA::ULong message_size; + CORBA::ULong message_size_; /// Request Id from the Fragment header - CORBA::ULong request_id; + CORBA::ULong request_id_; /** * The fragments are collected in a chain of message blocks (using @@ -87,7 +116,7 @@ public: * chain is reassembled into the main message block that is sent * along */ - ACE_Message_Block fragmented_messages; + // ACE_Message_Block fragmented_messages; /** @@ -101,11 +130,11 @@ public: * 3) Even if we allowed that at this layer the CDR classes are * not prepared to handle that. */ - CORBA::Octet first_fragment_byte_order; + // CORBA::Octet first_fragment_byte_order; /// The GIOP version for the first fragment /// @@ Same as above, all GIOP versions must match. - TAO_GIOP_Message_Version first_fragment_giop_version; + // TAO_GIOP_Message_Version first_fragment_giop_version; /** * If the messages are chained this represents the message type for @@ -113,23 +142,26 @@ public: * fragment and the upper level needs to know if it is a request, * locate request or what). */ - CORBA::Octet first_fragment_message_type; + // CORBA::Octet first_fragment_message_type; /// (Requests and Replys) - CORBA::Octet more_fragments; - -private: - /// Append <current> to the list of fragments - /// Also resets the state, because the current message was consumed. - int append_fragment (ACE_Message_Block ¤t); - + CORBA::Octet more_fragments_; + /// Missing data + CORBA::ULong missing_data_; }; +const size_t TAO_GIOP_MESSAGE_HEADER_LEN = 12; +const size_t TAO_GIOP_MESSAGE_SIZE_OFFSET = 8; +const size_t TAO_GIOP_MESSAGE_FLAGS_OFFSET = 6; +const size_t TAO_GIOP_MESSAGE_TYPE_OFFSET = 7; +const size_t TAO_GIOP_VERSION_MINOR_OFFSET = 5; +const size_t TAO_GIOP_VERSION_MAJOR_OFFSET = 4; +const size_t TAO_GIOP_MESSAGE_FRAGMENT_HEADER = 4; #if defined (__ACE_INLINE__) -# include "tao/GIOP_Message_State.i" +# include "tao/GIOP_Message_State.inl" #endif /* __ACE_INLINE__ */ #include "ace/post.h" diff --git a/TAO/tao/GIOP_Message_State.i b/TAO/tao/GIOP_Message_State.i index 653bf47d6a8..773faefe69b 100644 --- a/TAO/tao/GIOP_Message_State.i +++ b/TAO/tao/GIOP_Message_State.i @@ -1,8 +1,6 @@ // -*- C++ -*- //$Id$ -// **************************************************************** -// @@ Bala: we use the stars to separate classes in ACE+TAO // // Inlined methods for TAO_GIOP_Message_State @@ -10,7 +8,7 @@ ACE_INLINE int TAO_GIOP_Message_State::message_fragmented (void) { - if (this->more_fragments) + if (this->more_fragments_) return 1; return 0; @@ -19,12 +17,12 @@ TAO_GIOP_Message_State::message_fragmented (void) ACE_INLINE void TAO_GIOP_Message_State::reset (int /*reset_contents*/) { - this->message_size = 0; - this->more_fragments = 0; + this->message_size_ = 0; + this->more_fragments_ = 0; } ACE_INLINE CORBA::Boolean TAO_GIOP_Message_State::header_received (void) const { - return this->message_size != 0; + return this->message_size_ != 0; } diff --git a/TAO/tao/GIOP_Message_State.inl b/TAO/tao/GIOP_Message_State.inl new file mode 100644 index 00000000000..b6d2989eaa9 --- /dev/null +++ b/TAO/tao/GIOP_Message_State.inl @@ -0,0 +1,58 @@ +// -*- C++ -*- + +//$Id$ + +ACE_INLINE CORBA::ULong +TAO_GIOP_Message_State::message_size (void) const +{ + CORBA::ULong len = + this->message_size_ + TAO_GIOP_MESSAGE_HEADER_LEN; + + if (this->more_fragments_ && + this->giop_version_.minor > 1) + len += TAO_GIOP_MESSAGE_FRAGMENT_HEADER; + + return len; +} + +ACE_INLINE CORBA::ULong +TAO_GIOP_Message_State::payload_size (void) const +{ + return this->message_size_; +} + +ACE_INLINE CORBA::Octet +TAO_GIOP_Message_State::byte_order (void) const +{ + return this->byte_order_; +} + +ACE_INLINE void +TAO_GIOP_Message_State::reset (void) +{ + this->message_type_ = 0; + this->message_size_ = 0; + this->more_fragments_ = 0; + this->request_id_ = 0; + this->missing_data_ = 0; +} + +#if 0 +ACE_INLINE int +TAO_GIOP_Message_State::message_fragmented (void) +{ + if (this->more_fragments) + return 1; + + return 0; +} + + + +ACE_INLINE CORBA::Boolean +TAO_GIOP_Message_State::header_received (void) const +{ + return this->message_size != 0; +} + +#endif diff --git a/TAO/tao/IIOP_Acceptor.cpp b/TAO/tao/IIOP_Acceptor.cpp index 1a0e99030e2..4cfe224bae8 100644 --- a/TAO/tao/IIOP_Acceptor.cpp +++ b/TAO/tao/IIOP_Acceptor.cpp @@ -2,6 +2,7 @@ // $Id$ + #include "tao/IIOP_Acceptor.h" #include "tao/IIOP_Profile.h" #include "tao/MProfile.h" diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index c05a558efd3..f897a5e0e11 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -1,17 +1,18 @@ // $Id$ -#include "tao/IIOP_Connection_Handler.h" -#include "tao/Timeprobe.h" -#include "tao/debug.h" -#include "tao/ORB_Core.h" -#include "tao/ORB.h" -#include "tao/CDR.h" -#include "tao/Messaging_Policy_i.h" -#include "tao/Server_Strategy_Factory.h" -#include "tao/IIOP_Transport.h" -#include "tao/IIOP_Endpoint.h" -#include "tao/Transport_Cache_Manager.h" -#include "tao/Base_Transport_Property.h" +#include "IIOP_Connection_Handler.h" +#include "Timeprobe.h" +#include "debug.h" +#include "ORB_Core.h" +#include "ORB.h" +#include "CDR.h" +#include "Messaging_Policy_i.h" +#include "Server_Strategy_Factory.h" +#include "IIOP_Transport.h" +#include "IIOP_Endpoint.h" +#include "Transport_Cache_Manager.h" +#include "Base_Transport_Property.h" +#include "Resume_Handle.h" #if !defined (__ACE_INLINE__) # include "tao/IIOP_Connection_Handler.i" @@ -45,7 +46,7 @@ TAO_IIOP_Connection_Handler::TAO_IIOP_Connection_Handler (TAO_ORB_Core *orb_core { TAO_IIOP_Transport* specific_transport = 0; ACE_NEW(specific_transport, - TAO_IIOP_Transport(this, orb_core, 0)); + TAO_IIOP_Transport (this, orb_core, 0)); // store this pointer (indirectly increment ref count) this->transport(specific_transport); @@ -196,12 +197,12 @@ TAO_IIOP_Connection_Handler::handle_close (ACE_HANDLE handle, // in turn take appropiate action (such as sending exceptions to // all waiting reply handlers). if (TAO_debug_level) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) ") - ACE_TEXT ("IIOP_Connection_Handler::handle_close ") - ACE_TEXT ("(%d, %d)\n"), - handle, - rm)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) ") + ACE_TEXT ("IIOP_Connection_Handler::handle_close ") + ACE_TEXT ("(%d, %d)\n"), + handle, + rm)); --this->pending_upcalls_; if (this->pending_upcalls_ <= 0) @@ -244,8 +245,18 @@ TAO_IIOP_Connection_Handler::fetch_handle (void) } int +TAO_IIOP_Connection_Handler::resume_handler (void) +{ + return TAO_RESUMES_CONNECTION_HANDLER; +} + +int TAO_IIOP_Connection_Handler::handle_output (ACE_HANDLE) { + // Instantiate the resume handle here.. This will automatically + // resume the handle once data is written.. + TAO_Resume_Handle resume_handle (this->orb_core (), + this->fetch_handle ()); return this->transport ()->handle_output (); } @@ -310,48 +321,28 @@ TAO_IIOP_Connection_Handler::process_listen_point_list ( int -TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE h) - -{ - return this->handle_input_i (h); -} - +TAO_IIOP_Connection_Handler::handle_input (ACE_HANDLE) -int -TAO_IIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) { + // Increase the reference count on the upcall that have passed us. this->pending_upcalls_++; - // Call the transport read the message - int result = this->transport ()->read_process_message (max_wait_time); + TAO_Resume_Handle resume_handle (this->orb_core (), + this->fetch_handle ()); - // Now the message has been read - if (result == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Connection_Handler::read_message \n"))); - - } + int retval = this->transport ()->handle_input_i (resume_handle); // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) - result = -1; + retval = -1; - if (result == -1 || - result == 1) - return result; - - return 0; + return retval; } - - // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/IIOP_Connection_Handler.h b/TAO/tao/IIOP_Connection_Handler.h index fe220557d87..40a53456a4f 100644 --- a/TAO/tao/IIOP_Connection_Handler.h +++ b/TAO/tao/IIOP_Connection_Handler.h @@ -111,6 +111,10 @@ public: /// Return the underlying handle virtual ACE_HANDLE fetch_handle (void); + /// Send a TRUE value to the reactor, so that the reactor does not + /// resume the handler + virtual int resume_handler (void); + /// Use peer() to drain the outgoing message queue virtual int handle_output (ACE_HANDLE); @@ -131,8 +135,6 @@ protected: /// ensure that server threads eventually exit. virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0); private: diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp index 0561c25189a..d3e5eb60938 100644 --- a/TAO/tao/IIOP_Connector.cpp +++ b/TAO/tao/IIOP_Connector.cpp @@ -17,7 +17,6 @@ ACE_RCSID (TAO, IIOP_Connector, "$Id$") - #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Node<ACE_INET_Addr>; diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index a680226fd1e..d3c664db9af 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -16,7 +16,7 @@ #include "tao/ORB_Core.h" #include "tao/debug.h" #include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" +//#include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "tao/IIOP_Transport.i" @@ -26,12 +26,13 @@ ACE_RCSID (tao, IIOP_Transport, "$Id$") TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean flag) + CORBA::Boolean /*flag*/) : TAO_Transport (TAO_TAG_IIOP_PROFILE, orb_core) , connection_handler_ (handler) , messaging_object_ (0) { +#if 0 if (flag) { // Use the lite version of the protocol @@ -39,6 +40,7 @@ TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler, TAO_GIOP_Message_Lite (orb_core)); } else +#endif { // Use the normal GIOP object ACE_NEW (this->messaging_object_, @@ -81,52 +83,37 @@ TAO_IIOP_Transport::recv_i (char *buf, size_t len, const ACE_Time_Value *max_wait_time) { - return this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); -} - - -int -TAO_IIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, - int block) -{ - // Read the message of the socket - int result = this->messaging_object_->read_message (this, - block, + ssize_t n = this->connection_handler_->peer ().recv (buf, + len, max_wait_time); - if (result == -1) + // Most of the errors handling is common for + // Now the message has been read + if (n == -1 && TAO_debug_level > 4) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Transport::read_message, failure ") - ACE_TEXT ("in read_message ()"))); - - this->tms_->connection_closed (); - return -1; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure ") + ACE_TEXT ("recv_i () \n"))); } - if (result < 2) - return result; - - // Now we know that we have been able to read the complete message - // here.. We loop here to see whether we have read more than one - // message in our read. - // Set the result state - result = 1; + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; - // See we use the reactor semantics again - while (result > 0) + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) { - result = this->process_message (); + return -1; } - return result; + return n; } - int TAO_IIOP_Transport::register_handler_i (void) { @@ -261,131 +248,6 @@ TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) return this->connection_handler_->process_listen_point_list (listen_list); } -int -TAO_IIOP_Transport::process_message (void) -{ - // Check whether we have messages for processing - int retval = - this->messaging_object_->more_messages (); - - if (retval <= 0) - return retval; - - // Get the <message_type> that we have received - TAO_Pluggable_Message_Type t = - this->messaging_object_->message_type (); - - - int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("Close Connection Message recd \n"))); - - this->tms_->connection_closed (); - } - else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) - { - if (this->messaging_object_->process_request_message ( - this, - this->orb_core ()) == -1) - return -1; - } - else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) - { - TAO_Pluggable_Reply_Params params (this->orb_core ()); - - if (this->messaging_object_->process_reply_message (params) == -1) - { - - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("IIOP_Transport::process_message, ") - ACE_TEXT ("process_reply_message ()"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - result = this->tms_->dispatch_reply (params); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - - if (result == -1) - { - // Something really critical happened, we will forget about - // every reply on this connection. - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::") - ACE_TEXT ("process_message - ") - ACE_TEXT ("dispatch reply failed\n"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - this->messaging_object_->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke <connection_closed> - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); - } - else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - { - return -1; - } - - return 1; -} - - void TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) { diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index 940ab085b2e..6492dba9e8c 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -82,9 +82,6 @@ protected: size_t len, const ACE_Time_Value *s = 0); - virtual int read_process_message (ACE_Time_Value *max_time_value = 0, - int block =0); - virtual int register_handler_i (void); /// Method to do whatever it needs to do when the connection @@ -118,9 +115,6 @@ public: private: - /// Process the message that we have read - int process_message (void); - /// Set the Bidirectional context info in the service context list void set_bidir_context_info (TAO_Operation_Details &opdetails); diff --git a/TAO/tao/IORInfo.cpp b/TAO/tao/IORInfo.cpp index ab7edb16ccb..0e3e2071078 100644 --- a/TAO/tao/IORInfo.cpp +++ b/TAO/tao/IORInfo.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "IORInfo.h" #include "PolicyC.h" #include "IOPC.h" @@ -9,10 +10,9 @@ ACE_RCSID (tao, IORInfo, "$Id$") - TAO_IORInfo::TAO_IORInfo (TAO_ORB_Core *orb_core, - TAO_MProfile &mp, - CORBA::PolicyList *policy_list) + TAO_MProfile &mp, + CORBA::PolicyList *policy_list) : orb_core_ (orb_core), mp_ (mp), policy_list_ (policy_list) @@ -25,7 +25,7 @@ TAO_IORInfo::~TAO_IORInfo (void) CORBA::Policy_ptr TAO_IORInfo::get_effective_policy (CORBA::PolicyType type, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { // Check the policy list supplied by the POA. @@ -34,12 +34,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type, for (CORBA::ULong i = 0; i < policy_count; ++i) { CORBA::PolicyType pt = - (*(this->policy_list_))[i]->policy_type ( + (*(this->policy_list_))[i]->policy_type ( TAO_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (CORBA::Policy::_nil ()); if (pt == type) - return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]); + return CORBA::Policy::_duplicate ((*(this->policy_list_))[i]); } // TODO: Now check the global ORB policies. @@ -47,12 +47,12 @@ TAO_IORInfo::get_effective_policy (CORBA::PolicyType type, ACE_THROW_RETURN (CORBA::INV_POLICY (TAO_OMG_VMCID | 2, CORBA::COMPLETED_NO), - CORBA::Policy::_nil ()); + CORBA::Policy::_nil ()); } void TAO_IORInfo::add_ior_component (const IOP::TaggedComponent &component, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { // Add the given tagged component to all profiles. @@ -85,12 +85,12 @@ TAO_IORInfo::add_ior_component_to_profile ( TAO_Profile *profile = this->mp_.get_profile (i); if (profile->tag () == profile_id) - { - profile->add_tagged_component (component, ACE_TRY_ENV); + { + profile->add_tagged_component (component, ACE_TRY_ENV); ACE_CHECK; found_profile = 1; - } + } } // According to the Portable Interceptor specification, we're diff --git a/TAO/tao/Incoming_Message_Queue.cpp b/TAO/tao/Incoming_Message_Queue.cpp new file mode 100644 index 00000000000..17f8b31d6b9 --- /dev/null +++ b/TAO/tao/Incoming_Message_Queue.cpp @@ -0,0 +1,151 @@ +#include "Incoming_Message_Queue.h" +#include "ORB_Core.h" +#include "debug.h" + + +#if !defined (__ACE_INLINE__) +# include "Incoming_Message_Queue.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID (tao, Incoming_Message_Queue, "$Id$") + + +TAO_Incoming_Message_Queue::TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core) + : queued_data_ (0), + size_ (0), + orb_core_ (orb_core) +{ +} + +TAO_Incoming_Message_Queue::~TAO_Incoming_Message_Queue (void) +{ + // Delete the QD + if (this->size_) + { + TAO_Queued_Data *qd = this->dequeue_head (); + TAO_Queued_Data::release (qd); + } +} + +size_t +TAO_Incoming_Message_Queue::copy_tail (ACE_Message_Block &block) +{ + // The size of message that is copied + size_t n = 0; + + if (this->size_ > 0) + { + // Check to see if the length of the incoming block is less than + // that of the <missing_data_> of the tail. + if ((CORBA::Long)block.length () <= this->queued_data_->missing_data_) + { + n = block.length (); + } + else + { + n = this->queued_data_->missing_data_; + } + + // Do the copy + this->queued_data_->msg_block_->copy (block.rd_ptr (), + n); + + // Decerement the missing data + this->queued_data_->missing_data_ -= n; + } + + return n; +} + +TAO_Queued_Data * +TAO_Incoming_Message_Queue::dequeue_head (void) +{ + // Get the node on the head of the queue... + TAO_Queued_Data *tmp = + this->queued_data_->next_; + + // Reset the head node.. + this->queued_data_->next_ = tmp->next_; + + // Decrease the size + --this->size_; + + return tmp; +} + +TAO_Queued_Data * +TAO_Incoming_Message_Queue::dequeue_tail (void) +{ + // This is a bit painful stuff... + if (this->size_ == 0) + return 0; + + // Get the node on the head of the queue... + TAO_Queued_Data *tmp = + this->queued_data_->next_; + + while (tmp->next_ != this->queued_data_) + { + tmp = tmp->next_; + } + + // Put the head in tmp. + tmp->next_ = this->queued_data_->next_; + + TAO_Queued_Data *ret_qd = this->queued_data_; + + this->queued_data_ = tmp; + + // Decrease the size + --this->size_; + + return ret_qd; +} + + +int +TAO_Incoming_Message_Queue::enqueue_tail (TAO_Queued_Data *nd) +{ + if (this->size_ == 0) + { + this->queued_data_ = nd; + this->queued_data_->next_ = this->queued_data_; + } + else + { + nd->next_ = this->queued_data_->next_; + this->queued_data_->next_ = nd; + this->queued_data_ = nd; + } + + ++ this->size_; + return 0; +} + + +/************************************************************************/ +// Methods for TAO_Queued_Data +/************************************************************************/ + + +TAO_Queued_Data::TAO_Queued_Data (void) + : msg_block_ (0), + missing_data_ (0), + byte_order_ (0), + major_version_ (0), + minor_version_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0) +{ +} + +TAO_Queued_Data::TAO_Queued_Data (ACE_Message_Block *mb) + : msg_block_ (mb), + missing_data_ (0), + byte_order_ (0), + major_version_ (0), + minor_version_ (0), + msg_type_ (TAO_PLUGGABLE_MESSAGE_MESSAGERROR), + next_ (0) +{ +} diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h new file mode 100644 index 00000000000..cfbe613166e --- /dev/null +++ b/TAO/tao/Incoming_Message_Queue.h @@ -0,0 +1,161 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Incoming_Queued_Message.h + * + * $Id$ + * + * @author Balachandran Natarajan <bala@cs.wustl.edu> + */ +//============================================================================= + +#ifndef TAO_INCOMING_MESSAGE_QUEUE_H +#define TAO_INCOMING_MESSAGE_QUEUE_H +#include "ace/pre.h" + +#include "Pluggable_Messaging_Utils.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/// Forward declarations +class ACE_Data_Block; +class TAO_ORB_Core; +class TAO_Queued_Data; +class TAO_Transport; + +/** + * @class TAO_Incoming_Message_Queue + * + * @brief A queue of the messages in the incoming data path. + * + * Please read the documentation in the TAO_Transport class to find + * out more about the design of the incoming data path. + * + * Under certain conditions TAO may have to maintain a queue + * per-connection. This queue is drained by the pluggable + * protocols framework, normally under control of the ACE_Reactor, but + * other configurations are conceivable. + * + * The memory that is allocated for holding the messages comes from + * the global pool for the following reasons + * + * - the thread that reads a part of the message would not be the same + * thread that reads and fills the rest of the message + * - the thread that actually processes the message can be totally + * different. + * + */ + +class TAO_Export TAO_Incoming_Message_Queue +{ +public: + /// Contructor. + TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core); + + /// Destructor. + ~TAO_Incoming_Message_Queue (void); + + /// Adding and deleting a node from the queue. + TAO_Queued_Data *dequeue_head (void); + TAO_Queued_Data *dequeue_tail (void); + int enqueue_tail (TAO_Queued_Data *nd); + + /// Copy message from <block> to the tail of the queue. The size + /// of message that is copied to the tail node is returned. The + /// number of bytes copied depends on the amount of bytes needed to + /// make the tail node consistent. + size_t copy_tail (ACE_Message_Block &block); + + /// Return the length of the queue.. + CORBA::ULong queue_length (void); + + /// Methods for sanity check. Checks to see whether the node on the + /// head or tail is complete or not and ready for further + /// processing. + int is_tail_complete (void); + int is_head_complete (void); + + /// Return the size of data that is missing in tail of the queue. + size_t missing_data_tail (void) const; + /// void missing_data (size_t data); + +private: + + friend class TAO_Transport; + + /// Make a node for the queue. + TAO_Queued_Data *get_node (void); + +private: + + /// A circular linked listof messages that await processing + TAO_Queued_Data *queued_data_; + + /// The size of the queue + CORBA::ULong size_; + + /// Copy of our ORB Core + TAO_ORB_Core *orb_core_; +}; + +/************************************************************************/ + +/** + * @class TAO_Queued_Data + * + * @brief Represents a node in the queue of incoming messages. + * + * This class contains necessary information about a message that is + * stored in the queue. Such a node can be used by the incoming thread + * from the reactor to dequeue and process the message by sending it + * to the higher layers of the ORB. + */ + +class TAO_Export TAO_Queued_Data +{ +public: + /// Default Constructor + TAO_Queued_Data (void); + + /// Constructor. + TAO_Queued_Data (ACE_Message_Block *mb); + + /// Creation and deletion of a node in the queue. + static TAO_Queued_Data* get_queued_data (void); + + static void release (TAO_Queued_Data *qd); + + /// The message block that contains the message. + ACE_Message_Block *msg_block_; + + /// Data missing in the above message that hasn't been read or + /// processed yet. + CORBA::Long missing_data_; + + /// The byte order of the message that is stored in the node.. + CORBA::Octet byte_order_; + + /// Many protocols like GIOP have a major and minor version + /// information that would be needed to read and decipher the + /// message. + CORBA::Octet major_version_; + + CORBA::Octet minor_version_; + + /// The message type of the message + TAO_Pluggable_Message_Type msg_type_; + + /// Pounter to the next element in the queue. + TAO_Queued_Data *next_; +}; + + +#if defined (__ACE_INLINE__) +# include "Incoming_Message_Queue.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /*TAO_INCOMING_MESSAGE_QUEUE_H*/ diff --git a/TAO/tao/Incoming_Message_Queue.inl b/TAO/tao/Incoming_Message_Queue.inl new file mode 100644 index 00000000000..8d270b9c479 --- /dev/null +++ b/TAO/tao/Incoming_Message_Queue.inl @@ -0,0 +1,78 @@ +// -*- C++ -*- +//$Id$ +ACE_INLINE CORBA::ULong +TAO_Incoming_Message_Queue::queue_length (void) +{ + return this->size_; +} + +ACE_INLINE int +TAO_Incoming_Message_Queue::is_tail_complete (void) +{ + // If the size is 0 return -1 + if (this->size_ == 0) + return -1; + + if (this->size_ && + this->queued_data_->missing_data_ == 0) + return 1; + + return 0; +} + +ACE_INLINE int +TAO_Incoming_Message_Queue::is_head_complete (void) +{ + if (this->size_ == 0) + return -1; + + if (this->size_ && + this->queued_data_->next_->missing_data_ == 0) + return 1; + + return 0; +} + +ACE_INLINE size_t +TAO_Incoming_Message_Queue::missing_data_tail (void) const +{ + if (this->size_ != 0) + return this->queued_data_->missing_data_; + + return 0; +} + + + +ACE_INLINE TAO_Queued_Data * +TAO_Incoming_Message_Queue::get_node (void) +{ + return TAO_Queued_Data::get_queued_data (); +} + + +/************************************************************************/ +// Methods for TAO_Queued_Data +/************************************************************************/ +/*static*/ +ACE_INLINE TAO_Queued_Data * +TAO_Queued_Data::get_queued_data (void) +{ + // @@TODO: Use the global pool for allocationg... + TAO_Queued_Data *qd = 0; + ACE_NEW_RETURN (qd, + TAO_Queued_Data, + 0); + + return qd; +} + +/*static*/ +ACE_INLINE void +TAO_Queued_Data::release (TAO_Queued_Data *qd) +{ + ACE_Message_Block::release (qd->msg_block_); + + // @@TODO: Use the global pool for releasing.. + delete qd; +} diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index 5f975f1e7b5..c50a3166851 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "Invocation.h" #include "Principal.h" #include "Stub.h" @@ -29,8 +30,10 @@ # include "Invocation.i" #endif /* ! __ACE_INLINE__ */ + ACE_RCSID(tao, Invocation, "$Id$") + #if defined (ACE_ENABLE_TIMEPROBES) static const char *TAO_Invocation_Timeprobe_Description[] = diff --git a/TAO/tao/Invocation_Endpoint_Selectors.cpp b/TAO/tao/Invocation_Endpoint_Selectors.cpp index fafe05a84aa..1e9a83da328 100644 --- a/TAO/tao/Invocation_Endpoint_Selectors.cpp +++ b/TAO/tao/Invocation_Endpoint_Selectors.cpp @@ -14,6 +14,7 @@ ACE_RCSID(tao, Invocation_Endpoint_Selectors, "$Id$") + TAO_Invocation_Endpoint_Selector::~TAO_Invocation_Endpoint_Selector (void) { } diff --git a/TAO/tao/LocalObject.cpp b/TAO/tao/LocalObject.cpp index e8952ac76b2..2070ba6e8d1 100644 --- a/TAO/tao/LocalObject.cpp +++ b/TAO/tao/LocalObject.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "LocalObject.h" #if !defined (__ACE_INLINE__) @@ -14,6 +15,7 @@ ACE_RCSID (tao, LocalObject, "$Id$") + CORBA_LocalObject::~CORBA_LocalObject (void) { } diff --git a/TAO/tao/Makefile b/TAO/tao/Makefile index 980e6c4d87e..e3362cfd566 100644 --- a/TAO/tao/Makefile +++ b/TAO/tao/Makefile @@ -1,6 +1,6 @@ #---------------------------------------------------------------------------- - +# # $Id$ # # Makefile for TAO @@ -84,6 +84,8 @@ PUB_HDRS = \ PLUGGABLE_PROTOCOLS_FILES = \ Pluggable \ Transport \ + Incoming_Message_Queue \ + Resume_Handle \ Profile \ Endpoint \ Connector_Registry \ @@ -106,8 +108,6 @@ PLUGGABLE_MESSAGING_FILES = \ Pluggable_Messaging \ Pluggable_Messaging_Utils \ GIOP_Message_Base \ - GIOP_Message_Lite \ - GIOP_Message_Reactive_Handler \ GIOP_Message_Generator_Parser \ GIOP_Message_Generator_Parser_10 \ GIOP_Message_Generator_Parser_11 \ diff --git a/TAO/tao/MessagingC.h b/TAO/tao/MessagingC.h index 8795ab51435..7b35f808d59 100644 --- a/TAO/tao/MessagingC.h +++ b/TAO/tao/MessagingC.h @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/Muxed_TMS.cpp b/TAO/tao/Muxed_TMS.cpp index ee5f9647379..1bd476a279e 100644 --- a/TAO/tao/Muxed_TMS.cpp +++ b/TAO/tao/Muxed_TMS.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "tao/Muxed_TMS.h" #include "tao/Reply_Dispatcher.h" #include "tao/GIOP_Message_Version.h" @@ -9,6 +10,7 @@ ACE_RCSID(tao, Muxed_TMS, "$Id$") + TAO_Muxed_TMS::TAO_Muxed_TMS (TAO_Transport *transport) : TAO_Transport_Mux_Strategy (transport), request_id_generator_ (0), @@ -104,28 +106,6 @@ TAO_Muxed_TMS::dispatch_reply (TAO_Pluggable_Reply_Params ¶ms) // sending the request. } -/*TAO_GIOP_Message_State * -TAO_Muxed_TMS::get_message_state (void) -{ - if (this->message_state_ == 0) - { - // Create the next message state. - ACE_NEW_RETURN (this->message_state_, - TAO_GIOP_Message_State - (this->transport_->orb_core ()), - 0); - } - - return this->message_state_; -} - -void -TAO_Muxed_TMS::destroy_message_state (TAO_GIOP_Message_State *) -{ - delete this->message_state_; - this->message_state_ = 0; -}*/ - int TAO_Muxed_TMS::idle_after_send (void) { diff --git a/TAO/tao/Muxed_TMS.h b/TAO/tao/Muxed_TMS.h index 0fdf9fd71f8..0824dc1d72f 100644 --- a/TAO/tao/Muxed_TMS.h +++ b/TAO/tao/Muxed_TMS.h @@ -59,10 +59,6 @@ public: virtual int dispatch_reply (TAO_Pluggable_Reply_Params ¶ms); - // @@ Commented for the time being, let the commented line stay for - // sometime - Bala - // virtual TAO_GIOP_Message_State *get_message_state (void); - // virtual void destroy_message_state (TAO_GIOP_Message_State *); virtual int idle_after_send (void); virtual int idle_after_reply (void); virtual void connection_closed (void); diff --git a/TAO/tao/ORB.cpp b/TAO/tao/ORB.cpp index f54ba3a1d1f..0c0efbaf6db 100644 --- a/TAO/tao/ORB.cpp +++ b/TAO/tao/ORB.cpp @@ -1,6 +1,7 @@ // $Id$ + #include "ORB.h" #include "ORB_Table.h" #include "Connector_Registry.h" @@ -70,6 +71,7 @@ using std::set_unexpected; ACE_RCSID(tao, ORB, "$Id$") + static const char ior_prefix [] = "IOR:"; // = Static initialization. @@ -1755,6 +1757,7 @@ CORBA_ORB::object_to_string (CORBA::Object_ptr obj, CORBA::COMPLETED_NO), 0); + // Application writer controls what kind of objref strings they get, // maybe along with other things, by how they initialize the ORB. @@ -1981,8 +1984,11 @@ CORBA_ORB::ior_string_to_object (const char *str, int byte_order = *(mb.rd_ptr ()); mb.rd_ptr (1); mb.wr_ptr (len); - TAO_InputCDR stream (&mb, byte_order, TAO_DEF_GIOP_MAJOR, - TAO_DEF_GIOP_MINOR, this->orb_core_); + TAO_InputCDR stream (&mb, + byte_order, + TAO_DEF_GIOP_MAJOR, + TAO_DEF_GIOP_MINOR, + this->orb_core_); CORBA::Object_ptr objref = CORBA::Object::_nil (); stream >> objref; diff --git a/TAO/tao/Object_Ref_Table.cpp b/TAO/tao/Object_Ref_Table.cpp index c883296595d..f5892459696 100644 --- a/TAO/tao/Object_Ref_Table.cpp +++ b/TAO/tao/Object_Ref_Table.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "Object_Ref_Table.h" #include "Object.h" #include "Exception.h" @@ -13,6 +14,7 @@ ACE_RCSID (tao, Object_Ref_Table, "$Id$") + // **************************************************************** TAO_Object_Ref_Table::TAO_Object_Ref_Table (void) diff --git a/TAO/tao/Pluggable_Messaging.cpp b/TAO/tao/Pluggable_Messaging.cpp index c7ec77f596d..65c2c20a01c 100644 --- a/TAO/tao/Pluggable_Messaging.cpp +++ b/TAO/tao/Pluggable_Messaging.cpp @@ -14,9 +14,3 @@ TAO_Pluggable_Messaging::~TAO_Pluggable_Messaging (void) { } - -int -TAO_Pluggable_Messaging::more_messages (void) -{ - ACE_NOTSUP_RETURN (-1); -} diff --git a/TAO/tao/Pluggable_Messaging.h b/TAO/tao/Pluggable_Messaging.h index eb8fda573b0..15be5628acf 100644 --- a/TAO/tao/Pluggable_Messaging.h +++ b/TAO/tao/Pluggable_Messaging.h @@ -31,6 +31,7 @@ class TAO_Transport; class TAO_Operation_Details; class TAO_Target_Specification; class TAO_OutputCDR; +class TAO_Queued_Data; // @@ The more I think I about this class, I feel that this class need // not be a ABC as it is now. Instead we have these options @@ -108,27 +109,43 @@ public: /// general. virtual int format_message (TAO_OutputCDR &cdr) = 0; - /// Get the message type that was received. - virtual TAO_Pluggable_Message_Type message_type (void) = 0; - - /// Do any initialisations that may be needed. virtual void init (CORBA::Octet major, CORBA::Octet minor) = 0; - /// Reset teh messaging object - virtual void reset (int reset_flag = 1) = 0; - // Reset the messaging object + /// Parse the incoming messages.. + virtual int parse_incoming_messages (ACE_Message_Block &message_block) = 0; + + /// Calculate the amount of data that is missing in the <incoming> + /// message block. + virtual ssize_t missing_data (ACE_Message_Block &incoming) = 0; + + /// Get the details of the message parsed through the <qd>. + virtual void get_message_data (TAO_Queued_Data *qd) = 0; + + /* Extract the details of the next message from the <incoming> + * through <qd>. Returns 1 if there are more messages and returns a + * 0 if there are no more messages in <incoming>. + */ + virtual int extract_next_message (ACE_Message_Block &incoming, + TAO_Queued_Data *&qd) = 0; + + /// Check whether the node <qd> needs consolidation from <incoming> + virtual int consolidate_node (TAO_Queued_Data *qd, + ACE_Message_Block &incoming) = 0; /// Parse the request message, make an upcall and send the reply back /// to the "request initiator" virtual int process_request_message (TAO_Transport *transport, - TAO_ORB_Core *orb_core) = 0; + TAO_Queued_Data *qd) = 0; + /// Parse the reply message that we received and return the reply /// information though <reply_info> virtual int process_reply_message ( - TAO_Pluggable_Reply_Params &reply_info) = 0; + TAO_Pluggable_Reply_Params &reply_info, + TAO_Queued_Data *qd) = 0; + /// Generate a reply message with the exception <ex>. virtual int generate_exception_reply ( @@ -140,8 +157,8 @@ public: /// request/response? virtual int is_ready_for_bidirectional (void) = 0; - /// Are there any more messages that needs processing? - virtual int more_messages (void); + /// Reset the messaging the object + virtual void reset (void) = 0; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Pluggable_Messaging_Utils.cpp b/TAO/tao/Pluggable_Messaging_Utils.cpp index 8c6420c3694..5b0ea8553b2 100644 --- a/TAO/tao/Pluggable_Messaging_Utils.cpp +++ b/TAO/tao/Pluggable_Messaging_Utils.cpp @@ -1,4 +1,5 @@ //$Id$ + #include "tao/Pluggable_Messaging_Utils.h" #include "tao/ORB_Core.h" @@ -8,12 +9,11 @@ ACE_RCSID(tao, Pluggable_Messaging_Utils, "$Id$") + TAO_Pluggable_Reply_Params::TAO_Pluggable_Reply_Params ( TAO_ORB_Core *orb_core ) - : input_cdr_ (orb_core->create_input_cdr_data_block ( - ACE_CDR::DEFAULT_BUFSIZE - ), + : input_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), 0, TAO_ENCAP_BYTE_ORDER, TAO_DEF_GIOP_MAJOR, diff --git a/TAO/tao/Pluggable_Messaging_Utils.h b/TAO/tao/Pluggable_Messaging_Utils.h index 63f0d756e57..c5a2afae2f3 100644 --- a/TAO/tao/Pluggable_Messaging_Utils.h +++ b/TAO/tao/Pluggable_Messaging_Utils.h @@ -92,13 +92,17 @@ protected: * This represents a set of data that would be received by the * connector from the acceptor. */ -class TAO_Export TAO_Pluggable_Reply_Params +class TAO_Export TAO_Pluggable_Reply_Params : public TAO_Pluggable_Reply_Params_Base { public: /// Constructor. TAO_Pluggable_Reply_Params (TAO_ORB_Core *orb_core); + /* @todo: There is a way out clear this off from stack. Need to look + into that after 1.2 + */ + /// The stream with the non-demarshaled reply. This stream will be /// passed up to the stubs to demarshal the parameter values. TAO_InputCDR input_cdr_; diff --git a/TAO/tao/PolicyC.cpp b/TAO/tao/PolicyC.cpp index 6eb368913ae..84cec53c08d 100644 --- a/TAO/tao/PolicyC.cpp +++ b/TAO/tao/PolicyC.cpp @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/PolicyC.i b/TAO/tao/PolicyC.i index ceb80909d8b..1b2b0c267c7 100644 --- a/TAO/tao/PolicyC.i +++ b/TAO/tao/PolicyC.i @@ -2,6 +2,7 @@ // // $Id$ + // **** Code generated by the The ACE ORB (TAO) IDL Compiler **** // TAO and the TAO IDL Compiler have been developed by: // Center for Distributed Object Computing diff --git a/TAO/tao/PolicyFactory_Registry.cpp b/TAO/tao/PolicyFactory_Registry.cpp index 29910a7f3c7..814f6e0d1c4 100644 --- a/TAO/tao/PolicyFactory_Registry.cpp +++ b/TAO/tao/PolicyFactory_Registry.cpp @@ -2,6 +2,7 @@ // // $Id$ + #include "PolicyFactory_Registry.h" ACE_RCSID(tao, PolicyFactory_Registry, "$Id$") diff --git a/TAO/tao/PortableInterceptor.pidl b/TAO/tao/PortableInterceptor.pidl index e8421db4f8e..b2e4e8980a4 100644 --- a/TAO/tao/PortableInterceptor.pidl +++ b/TAO/tao/PortableInterceptor.pidl @@ -2,6 +2,7 @@ // // $Id$ + // ================================================================ // // This file was used to generate the code in PortableInterceptorC.* diff --git a/TAO/tao/Profile.cpp b/TAO/tao/Profile.cpp index b7b4f9b17a2..a6b816f6855 100644 --- a/TAO/tao/Profile.cpp +++ b/TAO/tao/Profile.cpp @@ -1,5 +1,6 @@ // $Id$ + #include "Profile.h" #include "Object_KeyC.h" @@ -13,6 +14,7 @@ ACE_RCSID(tao, Profile, "$Id$") + // **************************************************************** TAO_Profile::~TAO_Profile (void) @@ -261,9 +263,9 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV) if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Cannot add ") - ACE_TEXT ("IOP::TaggedComponent to profile.\n") - ACE_TEXT ("(%P|%t) Standard profile components ") - ACE_TEXT ("have been disabled or URL style IORs\n") + ACE_TEXT ("IOP::TaggedComponent to profile.\n") + ACE_TEXT ("(%P|%t) Standard profile components ") + ACE_TEXT ("have been disabled or URL style IORs\n") ACE_TEXT ("(%P|%t) are in use. Try ") ACE_TEXT ("\"-ORBStdProfileComponents 1\" and/or\n") ACE_TEXT ("(%P|%t) \"-ORBObjRefStyle IOR\".\n"))); @@ -276,8 +278,8 @@ TAO_Profile::verify_orb_configuration (CORBA::Environment &ACE_TRY_ENV) ACE_THROW (CORBA::BAD_PARAM ( CORBA_SystemException::_tao_minor_code ( TAO_DEFAULT_MINOR_CODE, - EINVAL), - CORBA::COMPLETED_NO)); + EINVAL), + CORBA::COMPLETED_NO)); } } @@ -292,8 +294,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV) if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Cannot add ") - ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0") - ACE_TEXT ("IOR profile.\n") + ACE_TEXT ("IOP::TaggedComponent to GIOP 1.0") + ACE_TEXT ("IOR profile.\n") ACE_TEXT ("(%P|%t) Try using a GIOP 1.1 or ") ACE_TEXT ("greater endpoint.\n"))); @@ -305,8 +307,8 @@ TAO_Profile::verify_profile_version (CORBA::Environment &ACE_TRY_ENV) ACE_THROW (CORBA::BAD_PARAM ( CORBA_SystemException::_tao_minor_code ( TAO_DEFAULT_MINOR_CODE, - EINVAL), - CORBA::COMPLETED_NO)); + EINVAL), + CORBA::COMPLETED_NO)); } } diff --git a/TAO/tao/Resume_Handle.cpp b/TAO/tao/Resume_Handle.cpp new file mode 100644 index 00000000000..af94eefa37e --- /dev/null +++ b/TAO/tao/Resume_Handle.cpp @@ -0,0 +1,24 @@ +#include "Resume_Handle.h" +#include "ORB_Core.h" + + +#if !defined (__ACE_INLINE__) +# include "Resume_Handle.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(tao, Resume_Handle, "$Id$") + +void +TAO_Resume_Handle::resume_handle (void) +{ + // If we have a complete message, just resume the handler + // Resume the handler. + if (this->orb_core_ && + this->orb_core_->reactor ()->resumable_handler () && + this->flag_ == TAO_HANDLE_RESUMABLE && + this->handle_ != ACE_INVALID_HANDLE) + this->orb_core_->reactor ()->resume_handler (this->handle_); + + // Set the flag, so that we dont resume again.. + this->flag_ = TAO_HANDLE_ALREADY_RESUMED; +} diff --git a/TAO/tao/Resume_Handle.h b/TAO/tao/Resume_Handle.h new file mode 100644 index 00000000000..36449fad030 --- /dev/null +++ b/TAO/tao/Resume_Handle.h @@ -0,0 +1,90 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Resume_Handle.h + * + * $Id$ + * + * @author Balachandran Natarajan <bala@cs.wustl.edu> + */ +//============================================================================= + +#ifndef TAO_RESUME_HANDLE_H +#define TAO_RESUME_HANDLE_H +#include "ace/pre.h" + +#include "TAO_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_ORB_Core; + + +/** + * @class TAO_Resume_Handle + * + * @brief A utility class that helps in resuming handlers if TAO uses + * a TP Reactor from ACE. + * + * Please read the documentation in the bugzilla #575 in the bugzilla + * database what we mean by handler resumption. + * + * When TAO uses a TP reactor, it takes care of resuming the handler + * once it makes sure that it has read the whole message out of the + * socket. During the process of reading the transport object would + * have to deal with errors in 'read' from the socket, or errors in + * the messages that has been received. Instead of calling + * resume_handler () on the reactor at every point in the code, we + * use this utility class to take care of the resumption. + */ + +class TAO_Export TAO_Resume_Handle +{ + +public: + + /// Ctor. + TAO_Resume_Handle (TAO_ORB_Core *orb_core = 0, + ACE_HANDLE h = ACE_INVALID_HANDLE); + /// Dtor + ~TAO_Resume_Handle (void); + + enum TAO_Handle_Resume_Flag + { + TAO_HANDLE_RESUMABLE = 0, + TAO_HANDLE_ALREADY_RESUMED, + TAO_HANDLE_LEAVE_SUSPENDED + }; + + /// Allow the users of this class to change the underlying flag. + void set_flag (TAO_Handle_Resume_Flag fl); + + /// Equal to operator.. + TAO_Resume_Handle &operator= (const TAO_Resume_Handle &rhs); + + /// Resume the handle in the reactor only if the ORB uses a TP + /// reactor. Else we dont resume the handle. + void resume_handle (void); + +private: + + /// Our ORB Core. + TAO_ORB_Core *orb_core_; + + /// The actual handle that needs resumption.. + ACE_HANDLE handle_; + + /// Th flag for indicating whether the handle has been resumed or + /// not. A value of '0' indicates that the handle needs resumption. + TAO_Handle_Resume_Flag flag_; +}; + +#if defined (__ACE_INLINE__) +# include "Resume_Handle.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/pre.h" +#endif /*TAO_RESUME_HANDLE*/ diff --git a/TAO/tao/Resume_Handle.inl b/TAO/tao/Resume_Handle.inl new file mode 100644 index 00000000000..38aae6740f1 --- /dev/null +++ b/TAO/tao/Resume_Handle.inl @@ -0,0 +1,38 @@ +// -*- C++ -*- +//$Id$ + +ACE_INLINE +TAO_Resume_Handle::TAO_Resume_Handle (TAO_ORB_Core *orb_core, + ACE_HANDLE h) + : orb_core_ (orb_core), + handle_ (h), + flag_ (TAO_HANDLE_RESUMABLE) +{ +} + +ACE_INLINE +TAO_Resume_Handle::~TAO_Resume_Handle (void) +{ + if (this->flag_ == TAO_HANDLE_RESUMABLE) + this->resume_handle (); + + this->orb_core_ = 0; + this->handle_ = ACE_INVALID_HANDLE; +} + + +ACE_INLINE void +TAO_Resume_Handle::set_flag (TAO_Handle_Resume_Flag fl) +{ + this->flag_ = fl; +} + +ACE_INLINE TAO_Resume_Handle & +TAO_Resume_Handle::operator= (const TAO_Resume_Handle &rhs) +{ + this->orb_core_ = rhs.orb_core_; + this->handle_ = rhs.handle_; + this->flag_ = rhs.flag_; + + return *this; +} diff --git a/TAO/tao/Strategies/DIOP_Connection_Handler.cpp b/TAO/tao/Strategies/DIOP_Connection_Handler.cpp index 9b95deeea56..beb08a7679b 100644 --- a/TAO/tao/Strategies/DIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/DIOP_Connection_Handler.cpp @@ -14,6 +14,7 @@ #include "tao/Server_Strategy_Factory.h" #include "tao/Transport_Cache_Manager.h" #include "tao/Base_Transport_Property.h" +#include "tao/Resume_Handle.h" #include "DIOP_Transport.h" #include "DIOP_Endpoint.h" @@ -259,6 +260,12 @@ TAO_DIOP_Connection_Handler::fetch_handle (void) return this->get_handle (); } +int +TAO_DIOP_Connection_Handler::resume_handler (void) +{ + return TAO_RESUMES_CONNECTION_HANDLER; +} + int TAO_DIOP_Connection_Handler::add_transport_to_cache (void) @@ -322,33 +329,28 @@ TAO_DIOP_Connection_Handler::process_listen_point_list ( */ int -TAO_DIOP_Connection_Handler::handle_input (ACE_HANDLE h) -{ - return this->handle_input_i (h); -} - - -int -TAO_DIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) +TAO_DIOP_Connection_Handler::handle_input (ACE_HANDLE) { + // Increase the reference count on the upcall that have passed us. this->pending_upcalls_++; - // Call the transport read the message - int result = this->transport ()->read_process_message (max_wait_time); + TAO_Resume_Handle resume_handle (this->orb_core (), + this->fetch_handle ()); + + int retval = this->transport ()->handle_input_i (resume_handle); // Now the message has been read - if (result == -1 && TAO_debug_level > 0) + if (retval == -1 && TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("DIOP_Connection_Handler::handle_input_i \n"))); + ACE_TEXT ("DIOP_Connection_Handler::handle_input \n"))); } // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) - result = -1; + retval = -1; // @@ Michael: // We always return 0, as we do not have any @@ -356,6 +358,8 @@ TAO_DIOP_Connection_Handler::handle_input_i (ACE_HANDLE, return 0; } + + // @@ Frank: From DIOP_Connect.cpp int TAO_DIOP_Connection_Handler::handle_cleanup (void) diff --git a/TAO/tao/Strategies/DIOP_Connection_Handler.h b/TAO/tao/Strategies/DIOP_Connection_Handler.h index b0291796f43..bb8fc50bbdf 100644 --- a/TAO/tao/Strategies/DIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/DIOP_Connection_Handler.h @@ -117,6 +117,9 @@ public: /// Return the underlying handle virtual ACE_HANDLE fetch_handle (void); + /// Event handler overload.. + virtual int resume_handler (void); + /// Add ourselves to Cache. int add_transport_to_cache (void); diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index 28bfc2c927a..1e74bc6f5f1 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -18,8 +18,9 @@ #include "tao/Stub.h" #include "tao/ORB_Core.h" #include "tao/debug.h" +#include "tao/Resume_Handle.h" #include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" +// #include "tao/GIOP_Message_Lite.h" #if !defined (__ACE_INLINE__) # include "DIOP_Transport.i" @@ -29,7 +30,7 @@ ACE_RCSID (tao, DIOP_Transport, "$Id$") TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean flag) + CORBA::Boolean /*flag*/) : TAO_Transport (TAO_TAG_UDP_PROFILE, orb_core) , connection_handler_ (handler) @@ -37,14 +38,14 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler, { // @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that // we read the whole UDP packet on a single read. - if (flag) + /* if (flag) { // Use the lite version of the protocol ACE_NEW (this->messaging_object_, TAO_GIOP_Message_Lite (orb_core, ACE_MAX_DGRAM_SIZE)); - } - else + } + else*/ { // Use the normal GIOP object ACE_NEW (this->messaging_object_, @@ -109,6 +110,30 @@ TAO_DIOP_Transport::recv_i (char *buf, errno)); } + // Most of the errors handling is common for + // Now the message has been read + if (n == -1 && TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure ") + ACE_TEXT ("recv_i () \n"))); + } + + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + // Remember the from addr to eventually use it as remote // addr for the reply. this->connection_handler_->addr (from_addr); @@ -116,42 +141,82 @@ TAO_DIOP_Transport::recv_i (char *buf, return n; } - int -TAO_DIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, - int block) +TAO_DIOP_Transport::handle_input_i (TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time, + int /*block*/) { - // Read the message of the socket - int result = this->messaging_object_->read_message (this, - block, - max_wait_time); - - if (result == -1) + // If there are no messages then we can go ahead to read from the + // handle for further reading.. + + // The buffer on the stack which will be used to hold the input + // messages + char buf [ACE_MAX_DGRAM_SIZE]; + +#if defined (ACE_HAS_PURIFY) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_HAS_PURIFY */ + + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->message_block_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_msgblock_allocator ()); + + + // Align the message block + ACE_CDR::mb_align (&message_block); + + + // Read the message into the message block that we have created on + // the stack. + ssize_t n = this->recv (message_block.rd_ptr (), + message_block.space (), + max_wait_time); + + // If there is an error return to the reactor.. + if (n <= 0) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("DIOP_Transport::read_process_message, failure in read_message ()"))); + if (n == -1) + this->tms_->connection_closed (); - this->tms_->connection_closed (); - return -1; + return n; } - if (result < 2) - return result; - // Now we know that we have been able to read the complete message - // here.. We loop here to see whether we have read more than one - // message in our read. - // Set the result state - result = 1; + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); - // See we use the reactor semantics again - while (result > 0) - { - result = this->process_message (); - } + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->parse_incoming_messages (message_block) == -1) + return -1; + + // NOTE: We are not performing any queueing nor any checking for + // missing data. We are assuming that ALL the data would be got in a + // single read. + + // Make a node of the message block.. + TAO_Queued_Data qd (&message_block); + + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); + + // Resume before starting to process the request.. + rh.resume_handle (); + + // Process the message + return this->process_parsed_messages (&qd); - return result; } @@ -275,130 +340,7 @@ TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) } */ -int -TAO_DIOP_Transport::process_message (void) -{ - // Check whether we have messages for processing - int retval = - this->messaging_object_->more_messages (); - - if (retval <= 0) - return retval; - // Get the <message_type> that we have received - TAO_Pluggable_Message_Type t = - this->messaging_object_->message_type (); - - - int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("Close Connection Message recd \n"))); - - this->tms_->connection_closed (); - } - else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) - { - if (this->messaging_object_->process_request_message (this, - this->orb_core ()) == -1) - return -1; - } - else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) - { - TAO_Pluggable_Reply_Params params (this->orb_core ()); - if (this->messaging_object_->process_reply_message (params) == -1) - { - - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("DIOP_Transport::process_message, process_reply_message ()"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - - result = - this->tms_->dispatch_reply (params); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - - - - if (result == -1) - { - // Something really critical happened, we will forget about - // every reply on this connection. - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : DIOP_Transport::") - ACE_TEXT ("process_message - ") - ACE_TEXT ("dispatch reply failed\n"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - this->messaging_object_->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke <connection_closed> - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); - } - else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - { - return -1; - } - - return 1; -} // @@ Frank: Hopefully DIOP doesn't need this /* diff --git a/TAO/tao/Strategies/DIOP_Transport.h b/TAO/tao/Strategies/DIOP_Transport.h index e8732f433cf..e70a0decc11 100644 --- a/TAO/tao/Strategies/DIOP_Transport.h +++ b/TAO/tao/Strategies/DIOP_Transport.h @@ -62,6 +62,10 @@ public: /// Default destructor. ~TAO_DIOP_Transport (void); + /// Look for the documentation in Transport.h. + virtual int handle_input_i (TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time = 0, + int block = 0); protected: /** @name Overridden Template Methods * @@ -83,11 +87,6 @@ protected: size_t len, const ACE_Time_Value *s = 0); - /// Read and process the message from the connection. The processing - /// of the message is done by delegating the work to the underlying - /// messaging object - virtual int read_process_message (ACE_Time_Value *max_time_value = 0, - int block =0); virtual int register_handler_i (void); @@ -114,9 +113,6 @@ public: CORBA::Octet minor); private: - /// Process the message that we have read - int process_message (void); - // @@ Frank : Not needed /* /// Set the Bidirectional context info in the service context list diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp index 84f5a560fc2..1c4f044eebd 100644 --- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp +++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.cpp @@ -8,10 +8,9 @@ ACE_RCSID (Strategies, GIOP_Message_NonReactive_Base, "$Id$") -TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core, - size_t buf_size) - : TAO_GIOP_Message_Base (orb_core, buf_size), - message_handler_ (orb_core, this, buf_size) +TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core) + + : TAO_GIOP_Message_Base (orb_core) { } @@ -19,14 +18,18 @@ TAO_GIOP_Message_NonReactive_Base::TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Co int TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport, + ACE_Message_Block &block, int /*block*/, ACE_Time_Value *max_wait_time) { // Call the handler to read and do a simple parse of the header of // the message. int retval = - this->message_handler_.read_parse_message (transport, - max_wait_time); + this->read_data (transport, + max_wait_time); + + // Before we do this let us reset the + char *buf = this->input_cdr_.rd_ptr (); // Error in the message that was received @@ -61,6 +64,19 @@ TAO_GIOP_Message_NonReactive_Base::read_message (TAO_Transport *transport, return 2; } + +size_t +TAO_GIOP_Message_NonReactive_Base::read_data (TAO_Transport *transport, + ACE_Time_Value *time) +{ + + transport->recv (buf, + n, + max_wait_time); + +} + + TAO_Pluggable_Message_Type TAO_GIOP_Message_NonReactive_Base::message_type (void) { diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h index d6e8b434ecd..c7ddb348117 100644 --- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h +++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Base.h @@ -28,21 +28,23 @@ class TAO_Pluggable_Reply_Params; /** * @class TAO_GIOP_Message_NonReactive_Base * - * @brief Uses the NonReactive handler class for reading messages. + * @brief Uses the NonReactive mechanism to read and process + * messages. * - * This class uses the TAO_GIOP_Message_NonReactive_Handler class to - * read and parse messages. This class derives from - * TAO_GIOP_Message_Base. It just redirects most of the functions to - * the base class but just acts as a sort of place holder for the - * NonReactive handler class. + * Some protocols based on shared memory cannot make use of the + * reactor as other protocols based on TCP/IP. This class is a relief + * for such protocols. This effectively does the following + * - reads the GIOP header out of the transport + * - processes the header to determine the length of the message + * and other details. + * - reads the body of the message from the transport + * - passes the data to the base class for making the upcall. */ class TAO_Strategies_Export TAO_GIOP_Message_NonReactive_Base :public TAO_GIOP_Message_Base { public: - friend class TAO_GIOP_Message_NonReactive_Handler; - /// Constructor TAO_GIOP_Message_NonReactive_Base (TAO_ORB_Core *orb_core, size_t cdr_size = ACE_CDR::DEFAULT_BUFSIZE); @@ -82,9 +84,8 @@ public: private: - /// Thr message handler object that does reading and parsing of the - /// incoming messages - TAO_GIOP_Message_NonReactive_Handler message_handler_; + /// The input cdr stream in which the incoming data is stored. + TAO_InputCDR input_cdr_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h index 02b93da7d47..a8d9642a1b1 100644 --- a/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h +++ b/TAO/tao/Strategies/GIOP_Message_NonReactive_Handler.h @@ -77,8 +77,7 @@ private: /// Our Message base TAO_GIOP_Message_NonReactive_Base *mesg_base_; - /// The input cdr stream in which the incoming data is stored. - TAO_InputCDR input_cdr_; + }; #if defined (__ACE_INLINE__) diff --git a/TAO/tao/Strategies/Makefile b/TAO/tao/Strategies/Makefile index 4d2263b56e8..890b22d056a 100644 --- a/TAO/tao/Strategies/Makefile +++ b/TAO/tao/Strategies/Makefile @@ -1,4 +1,5 @@ #---------------------------------------------------------------------------- +# # $Id$ # # Makefile for TAO_Strategies @@ -45,8 +46,6 @@ CPP_SRCS += \ SHMIOP_Acceptor \ SHMIOP_Connection_Handler \ SHMIOP_Endpoint \ - GIOP_Message_NonReactive_Base \ - GIOP_Message_NonReactive_Handler \ TAO_Strategies_Internal \ uiop_endpoints \ advanced_resource \ diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp index fd7fb9db6e6..5e0b8361a75 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp @@ -10,12 +10,11 @@ #include "tao/ORB.h" #include "tao/CDR.h" #include "tao/Messaging_Policy_i.h" -#include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" #include "tao/Server_Strategy_Factory.h" #include "tao/Base_Transport_Property.h" #include "tao/Transport_Cache_Manager.h" #include "SHMIOP_Endpoint.h" +#include "tao/Resume_Handle.h" #if !defined (__ACE_INLINE__) # include "SHMIOP_Connection_Handler.inl" @@ -220,8 +219,18 @@ TAO_SHMIOP_Connection_Handler::fetch_handle (void) } int +TAO_SHMIOP_Connection_Handler::resume_handler (void) +{ + return TAO_RESUMES_CONNECTION_HANDLER; +} + +int TAO_SHMIOP_Connection_Handler::handle_output (ACE_HANDLE) { + // Instantiate the resume handle here.. This will automatically + // resume the handle once data is written.. + TAO_Resume_Handle resume_handle (this->orb_core (), + this->fetch_handle ()); return this->transport ()->handle_output (); } @@ -249,44 +258,28 @@ TAO_SHMIOP_Connection_Handler::add_transport_to_cache (void) int -TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE h) -{ - return this->handle_input_i (h); -} - - -int -TAO_SHMIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) +TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE) { + // Increase the reference count on the upcall that have passed us. this->pending_upcalls_++; - // Call the transport read the message - int result = this->transport ()->read_process_message (max_wait_time); - - // Now the message has been read - if (result == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Connection_Handler::read_message \n"))); + TAO_Resume_Handle resume_handle (this->orb_core (), + this->fetch_handle ()); - } + int retval = this->transport ()->handle_input_i (resume_handle); // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) - result = -1; - - if (result == 0 || result == -1) - { - return result; - } + retval = -1; - return 0; + return retval; } + + + // **************************************************************** #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h index cf9c2fe3549..6cbf34e5457 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h @@ -90,6 +90,9 @@ public: /// Documented in ACE_Event_Handler virtual int handle_output (ACE_HANDLE); + /// Overload for resuming handlers.. + virtual int resume_handler (void); + /// Add ourselves to Cache. int add_transport_to_cache (void); @@ -104,8 +107,6 @@ protected: /// ensure that server threads eventually exit. virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0); private: diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 81c803e0a41..0b333e0c566 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -15,9 +15,9 @@ #include "tao/Stub.h" #include "tao/ORB_Core.h" #include "tao/debug.h" +#include "tao/Resume_Handle.h" +#include "tao/GIOP_Message_Base.h" -#include "tao/GIOP_Message_Lite.h" -#include "GIOP_Message_NonReactive_Base.h" #if !defined (__ACE_INLINE__) # include "SHMIOP_Transport.i" @@ -27,12 +27,13 @@ ACE_RCSID (Strategies, SHMIOP_Transport, "$Id$") TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean flag) + CORBA::Boolean /*flag*/) : TAO_Transport (TAO_TAG_SHMEM_PROFILE, orb_core), connection_handler_ (handler), messaging_object_ (0) { +#if 0 if (flag) { // Use the lite version of the protocol @@ -40,10 +41,11 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl TAO_GIOP_Message_Lite (orb_core)); } else +#endif /*#if 0 */ { // Use the normal GIOP object ACE_NEW (this->messaging_object_, - TAO_GIOP_Message_NonReactive_Base (orb_core)); + TAO_GIOP_Message_Base (orb_core)); } } @@ -91,42 +93,100 @@ TAO_SHMIOP_Transport::recv_i (char *buf, size_t len, const ACE_Time_Value *max_wait_time) { - return this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); + ssize_t n = 0; + + int read_break = 0; + + while (!read_break) + { + n = this->connection_handler_->peer ().recv (buf, + len, + max_wait_time); + + // If we get a EWOULBLOCK we try to read again. + if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + n = 0; + continue; + } + + // If there is anything else we just drop out of the loop. + read_break = 1; + } + + if (n == 0 || n == -1) + { + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure ") + ACE_TEXT ("recv_i () \n"))); + } + } + + return n; + } + int -TAO_SHMIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, - int block) +TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) { - // Read the message of the socket - int result = this->messaging_object_->read_message (this, - block, - max_wait_time); - - if (result == -1) + // Calculate the actual length of the load that we are supposed to + // read which is equal to the <missing_data> + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.length (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); + + // .. do a read on the socket again. + ssize_t bytes = 0; + + // As this used for transports where things are available in one + // shot this looping should not create any problems. + for (size_t n = missing_data; + n != 0; + n -= bytes) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Transport::read_message, failure in read_message ()"))); + // We would have liked to use something like a recv_n () + // here. But at the time when the code was written, the MEM_Stream + // classes had poor support for recv_n (). Till a day when we + // get proper recv_n (), let us stick with this. The other + // argument that can be said against this is that, this is the + // bad layer in which this is being done ie. recv_n is + // simulated. But... + bytes = this->recv (incoming.wr_ptr (), + n, + max_wait_time); + + if (bytes == 0 || + bytes == -1) + { + return -1; + } - this->tms_->connection_closed (); - return -1; + incoming.wr_ptr (bytes); } - if (result < 2) - return result; - // Now we know that we have been able to read the complete message - // here.. We loop here to see whether we have read more than one - // message in our read. + TAO_Queued_Data pqd (&incoming); - // See we use the reactor semantics again - result = this->process_message (); + // With SHMIOP we would not have any missing data... + pqd.missing_data_ = 0; + this->messaging_object ()->get_message_data (&pqd); - return result; + // Resume the handle before processing the request + rh.resume_handle (); + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd); } @@ -220,127 +280,6 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -int -TAO_SHMIOP_Transport::process_message (void) -{ - // Check whether we have messages for processing - int retval = - this->messaging_object_->more_messages (); - - // The messages are fragmented, so we go back to the reactor. - if (retval <= 0) - return retval; - - // Get the <message_type> that we have received - TAO_Pluggable_Message_Type t = - this->messaging_object_->message_type (); - - - int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("Close Connection Message recd \n"))); - - this->tms_->connection_closed (); - } - else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) - { - if (this->messaging_object_->process_request_message (this, - this->orb_core ()) == -1) - return -1; - } - else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) - { - TAO_Pluggable_Reply_Params params (this->orb_core ()); - if (this->messaging_object_->process_reply_message (params) == -1) - { - - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("SHMIOP_Transport::process_message, process_reply_message ()"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - - result = - this->tms_->dispatch_reply (params); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : SHMIOP_Transport::") - ACE_TEXT ("process_message - ") - ACE_TEXT ("dispatch reply failed\n"))); - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - this->messaging_object_->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke <connection_closed> - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); - } - else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - { - return -1; - } - - return 0; -} - void TAO_SHMIOP_Transport::transition_handler_state_i (void) { diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h index 60812057b5b..61785ef6215 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.h +++ b/TAO/tao/Strategies/SHMIOP_Transport.h @@ -78,11 +78,10 @@ protected: size_t len, const ACE_Time_Value *s = 0); - /// Read and process the message from the connection. The processing - /// of the message is done by delegating the work to the underlying - /// messaging object - virtual int read_process_message (ACE_Time_Value *max_time_value = 0, - int block =0); + virtual int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); virtual int register_handler_i (void); @@ -109,10 +108,6 @@ public: CORBA::Octet minor); private: - /// Process the message that we have read - int process_message (void); - -private: /// The connection service handler used for accessing lower layer /// communication protocols. TAO_SHMIOP_Connection_Handler *connection_handler_; diff --git a/TAO/tao/Strategies/TAO_Strategies.dsp b/TAO/tao/Strategies/TAO_Strategies.dsp index 0c054c6e68f..8056375a565 100644 --- a/TAO/tao/Strategies/TAO_Strategies.dsp +++ b/TAO/tao/Strategies/TAO_Strategies.dsp @@ -98,46 +98,10 @@ SOURCE=.\advanced_resource.cpp # End Source File
# Begin Source File
-SOURCE=.\DIOP_Acceptor.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connection_Handler.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connector.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Endpoint.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Factory.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Profile.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Transport.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\FIFO_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_NonReactive_Base.cpp
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Handler.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\LFU_Connection_Purging_Strategy.cpp
# End Source File
# Begin Source File
@@ -226,42 +190,6 @@ SOURCE=.\advanced_resource.h # End Source File
# Begin Source File
-SOURCE=.\DIOP_Acceptor.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connection_Handler.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connector.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Endpoint.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Factory.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Profile.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Transport.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Base.h
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_NonReactive_Handler.h
-# End Source File
-# Begin Source File
-
SOURCE=.\Reactor_Per_Priority.h
# End Source File
# Begin Source File
@@ -358,26 +286,6 @@ SOURCE=.\advanced_resource.i # End Source File
# Begin Source File
-SOURCE=.\DIOP_Acceptor.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Connection_Handler.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Endpoint.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Profile.i
-# End Source File
-# Begin Source File
-
-SOURCE=.\DIOP_Transport.i
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_NonReactive_Base.inl
# End Source File
# Begin Source File
diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp index 4d412f7c870..9e0e917070d 100644 --- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp @@ -16,6 +16,7 @@ #include "tao/Base_Transport_Property.h" #include "tao/GIOP_Message_Lite.h" #include "tao/Transport_Cache_Manager.h" +#include "tao/Resume_Handle.h" #if !defined (__ACE_INLINE__) # include "UIOP_Connection_Handler.inl" @@ -201,6 +202,12 @@ TAO_UIOP_Connection_Handler::fetch_handle (void) return this->get_handle (); } +int +TAO_UIOP_Connection_Handler::resume_handler (void) +{ + return TAO_RESUMES_CONNECTION_HANDLER; +} + int TAO_UIOP_Connection_Handler::add_transport_to_cache (void) @@ -224,44 +231,23 @@ TAO_UIOP_Connection_Handler::add_transport_to_cache (void) int -TAO_UIOP_Connection_Handler::handle_input (ACE_HANDLE h) -{ - return this->handle_input_i (h); -} - - -int -TAO_UIOP_Connection_Handler::handle_input_i (ACE_HANDLE, - ACE_Time_Value *max_wait_time) +TAO_UIOP_Connection_Handler::handle_input (ACE_HANDLE) { this->pending_upcalls_++; - // Call the transport read the message - int result = this->transport ()->read_process_message (max_wait_time); + TAO_Resume_Handle resume_handle (this->orb_core (), + this->fetch_handle ()); - // Now the message has been read - if (result == -1 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("UIOP_Connection_Handler::read_message \n"))); - - } + int retval = + this->transport ()->handle_input_i (resume_handle); // The upcall is done. Bump down the reference count if (--this->pending_upcalls_ <= 0) - result = -1; + retval = -1; - if (result == 0 || result == -1) - { - return result; - } - - return 0; + return retval; } - - #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Svc_Handler<ACE_LSOCK_STREAM, ACE_NULL_SYNCH>; diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.h b/TAO/tao/Strategies/UIOP_Connection_Handler.h index 3432fd3b9ae..43f3a8ec981 100644 --- a/TAO/tao/Strategies/UIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.h @@ -106,6 +106,9 @@ public: /// Return the underlying handle virtual ACE_HANDLE fetch_handle (void); + /// Overload for resuming handlers.. + virtual int resume_handler (void); + /// Add ourselves to Cache. int add_transport_to_cache (void); @@ -116,12 +119,8 @@ protected: /// Reads a message from the <peer()>, dispatching and servicing it /// appropriately. /// handle_input() just delegates on handle_input_i() which timeouts - /// after <max_wait_time>, this is used in thread-per-connection to - /// ensure that server threads eventually exit. - + /// after <max_wait_time> virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); - virtual int handle_input_i (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Time_Value *max_wait_time = 0); private: diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index 0049317a677..1f8f6dc86d0 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -27,19 +27,19 @@ ACE_RCSID (Strategies, UIOP_Transport, "$Id$") TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler, TAO_ORB_Core *orb_core, - CORBA::Boolean flag) + CORBA::Boolean /*flag*/) : TAO_Transport (TAO_TAG_UIOP_PROFILE, orb_core) , connection_handler_ (handler) , messaging_object_ (0) { - if (flag) + /* if (flag) { // Use the lite version of the protocol ACE_NEW (this->messaging_object_, TAO_GIOP_Message_Lite (orb_core)); } - else + else*/ { // Use the normal GIOP object ACE_NEW (this->messaging_object_, @@ -82,50 +82,37 @@ TAO_UIOP_Transport::recv_i (char *buf, size_t len, const ACE_Time_Value *max_wait_time) { - return this->connection_handler_->peer ().recv (buf, - len, - max_wait_time); -} - -int -TAO_UIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time, - int block) -{ - // Read the message of the socket - int result = this->messaging_object_->read_message (this, - block, + ssize_t n = this->connection_handler_->peer ().recv (buf, + len, max_wait_time); - if (result == -1) + // Most of the errors handling is common for + // Now the message has been read + if (n == -1 && TAO_debug_level > 4) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("UIOP_Transport::read_message, failure in read_message ()"))); - - this->tms_->connection_closed (); - return -1; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p \n"), + ACE_TEXT ("TAO - read message failure ") + ACE_TEXT ("recv_i () \n"))); } - if (result < 2) - return result; - - // Now we know that we have been able to read the complete message - // here.. We loop here to see whether we have read more than one - // message in our read. - // Set the result state - result = 1; + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; - // See we use the reactor semantics again - while (result > 0) + return -1; + } + // @@ What are the other error handling here?? + else if (n == 0) { - result = this->process_message (); + return -1; } - return result; + return n; } - int TAO_UIOP_Transport::register_handler_i (void) { @@ -216,126 +203,6 @@ TAO_UIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -int -TAO_UIOP_Transport::process_message (void) -{ - // Check whether we have messages for processing - int retval = - this->messaging_object_->more_messages (); - - if (retval <= 0) - return retval; - - // Get the <message_type> that we have received - TAO_Pluggable_Message_Type t = - this->messaging_object_->message_type (); - - - int result = 0; - if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("Close Connection Message recd \n"))); - - this->tms_->connection_closed (); - } - else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) - { - if (this->messaging_object_->process_request_message (this, - this->orb_core ()) == -1) - return -1; - } - else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) - { - TAO_Pluggable_Reply_Params params (this->orb_core ()); - if (this->messaging_object_->process_reply_message (params) == -1) - { - - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - %p\n"), - ACE_TEXT ("UIOP_Transport::process_message, process_reply_message ()"))); - - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - - result = - this->tms_->dispatch_reply (params); - - // @@ Somehow it seems dangerous to reset the state *after* - // dispatching the request, what if another threads receives - // another reply in the same connection? - // My guess is that it works as follows: - // - For the exclusive case there can be no such thread. - // - The the muxed case each thread has its own message_state. - // I'm pretty sure this comment is right. Could somebody else - // please look at it and confirm my guess? - - // @@ The above comment was found in the older versions of the - // code. The code was also written in such a way that, when - // the client thread on a call from handle_input () from the - // reactor a call would be made on the handle_client_input - // (). The implementation of handle_client_input () looked so - // flaky. It used to create a message state upon entry in to - // the function using the TMS and destroy that on exit. All - // this was fine _theoretically_ for multiple threads. But - // the flakiness was originating in the implementation of - // get_message_state () where we were creating message state - // only once and dishing it out for every thread till one of - // them destroy's it. So, it looked broken. That has been - // changed. Why?. To my knowledge, the reactor does not call - // handle_input () on two threads at the same time. So, IMHO - // that defeats the purpose of creating a message state for - // every thread. This is just my guess. If we run in to - // problems this place needs to be revisited. If someone else - // is going to take a look please contact bala@cs.wustl.edu - // for details on this-- Bala - - if (result == -1) - { - if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("TAO (%P|%t) : UIOP_Transport::") - ACE_TEXT ("process_message - ") - ACE_TEXT ("dispatch reply failed\n"))); - this->messaging_object_->reset (); - this->tms_->connection_closed (); - return -1; - } - - if (result == 0) - { - this->messaging_object_->reset (); - - // The reply dispatcher was no longer registered. - // This can happened when the request/reply - // times out. - // To throw away all registered reply handlers is - // not the right thing, as there might be just one - // old reply coming in and several valid new ones - // pending. If we would invoke <connection_closed> - // we would throw away also the valid ones. - //return 0; - } - - - // This is a NOOP for the Exclusive request case, but it actually - // destroys the stream in the muxed case. - //this->tms_->destroy_message_state (message_state); - } - else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) - { - return -1; - } - - return 1; -} - void TAO_UIOP_Transport::transition_handler_state_i (void) { diff --git a/TAO/tao/Strategies/UIOP_Transport.h b/TAO/tao/Strategies/UIOP_Transport.h index f3bb0b2b920..87e4715fae0 100644 --- a/TAO/tao/Strategies/UIOP_Transport.h +++ b/TAO/tao/Strategies/UIOP_Transport.h @@ -81,12 +81,6 @@ protected: size_t len, const ACE_Time_Value *s = 0); - /// Read and process the message from the connection. The processing - /// of the message is done by delegating the work to the underlying - /// messaging object - virtual int read_process_message (ACE_Time_Value *max_time_value = 0, - int block =0); - virtual int register_handler_i (void); /// Method to do whatever it needs to do when the connection @@ -115,11 +109,6 @@ public: private: - /// Process the message that we have read - int process_message (void); - -private: - /// The connection service handler used for accessing lower layer /// communication protocols. TAO_UIOP_Connection_Handler *connection_handler_; diff --git a/TAO/tao/Synch_Reply_Dispatcher.cpp b/TAO/tao/Synch_Reply_Dispatcher.cpp index 70c17a67629..c0f5a6df904 100644 --- a/TAO/tao/Synch_Reply_Dispatcher.cpp +++ b/TAO/tao/Synch_Reply_Dispatcher.cpp @@ -8,6 +8,7 @@ ACE_RCSID(tao, Synch_Reply_Dispatcher, "$Id$") + // Constructor. TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher ( TAO_ORB_Core *orb_core, @@ -17,8 +18,16 @@ TAO_Synch_Reply_Dispatcher::TAO_Synch_Reply_Dispatcher ( reply_received_ (0), orb_core_ (orb_core), wait_strategy_ (0), - reply_cdr_ (orb_core->create_input_cdr_data_block (ACE_CDR::DEFAULT_BUFSIZE), - 0, + buf_ (), + db_ (sizeof buf_, + ACE_Message_Block::MB_DATA, + this->buf_, + this->orb_core_->message_block_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_dblock_allocator ()), + reply_cdr_ (&db_, + ACE_Message_Block::DONT_DELETE, TAO_ENCAP_BYTE_ORDER, TAO_DEF_GIOP_MAJOR, TAO_DEF_GIOP_MINOR, @@ -61,20 +70,11 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( // dispatcher is used because the request must be re-sent. //this->message_state_.reset (0); - // Steal the buffer so that no copying is done. - this->reply_cdr_.exchange_data_blocks (params.input_cdr_); - - /*if (&this->message_state_ != message_state) - { - // The Transport Mux Strategy did not use our Message_State to - // receive the event, possibly because it is muxing multiple - // requests over the same connection. - - // Steal the buffer so that no copying is done. - this->message_state_.cdr.steal_from (message_state->cdr); + // Transfer the <params.input_cdr_>'s content to this->reply_cdr_ + ACE_Data_Block *db = + this->reply_cdr_.clone_from (params.input_cdr_); - // There is no need to copy the other fields! - }*/ + ACE_UNUSED_ARG (db); if (this->wait_strategy_ != 0) { @@ -91,12 +91,6 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply ( return 1; } -/*TAO_GIOP_Message_State * -TAO_Synch_Reply_Dispatcher::message_state (void) -{ - return &this->message_state_; -}*/ - void TAO_Synch_Reply_Dispatcher::dispatcher_bound (TAO_Transport *transport) { diff --git a/TAO/tao/Synch_Reply_Dispatcher.h b/TAO/tao/Synch_Reply_Dispatcher.h index 571948a9dbb..ec6aabe157b 100644 --- a/TAO/tao/Synch_Reply_Dispatcher.h +++ b/TAO/tao/Synch_Reply_Dispatcher.h @@ -66,13 +66,6 @@ protected: IOP::ServiceContextList &reply_service_info_; private: - // TAO_GIOP_Message_State message_state_; - // All the state required to receive the input... - // @@ Having members of type TAO_GIOP* indicates that we - // (Reply_despatcher) are aware of the underlying messaging - // protocol. But for the present let us close our eyes till we are - // able to iterate on a use case - Bala. - /// Flag that indicates the reply has been received. int reply_received_; @@ -83,6 +76,20 @@ private: /// appropriate). TAO_Wait_Strategy *wait_strategy_; + /* @@todo: At some point of time we are going to get to a situation + where TAO has huge stack sizes. Need to think on how we would + deal with that. One idea would be to push these things on TSS as + this is created by the thread on a per invocation basis. Post 1.2 + would be a nice time for that I guess + */ + + /// The buffer that is used to initialise the data block + char buf_[ACE_CDR::DEFAULT_BUFSIZE]; + + /// datablock that is created on teh stack to initialise the CDR + /// stream underneath. + ACE_Data_Block db_; + /// CDR stream which has the reply information that needs to be /// demarshalled by the stubs TAO_InputCDR reply_cdr_; diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index 15ae0b9c8d4..62572777b23 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -411,18 +411,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.cpp # End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.cpp
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.cpp
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_State.cpp
# End Source File
# Begin Source File
@@ -487,6 +479,10 @@ SOURCE=.\IIOPC.cpp # End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.cpp
# End Source File
# Begin Source File
@@ -711,6 +707,10 @@ SOURCE=.\Resource_Factory.cpp # End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Sequence.cpp
# End Source File
# Begin Source File
@@ -1143,18 +1143,10 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.h # End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.h
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.h
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.h
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_State.h
# End Source File
# Begin Source File
@@ -1219,6 +1211,10 @@ SOURCE=.\IIOPC.h # End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.h
# End Source File
# Begin Source File
@@ -1491,6 +1487,10 @@ SOURCE=.\Resource_Factory.h # End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.h
+# End Source File
+# Begin Source File
+
SOURCE=.\sequence.h
# End Source File
# Begin Source File
@@ -1859,19 +1859,11 @@ SOURCE=.\GIOP_Message_Generator_Parser_Impl.inl # End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Lite.i
-# End Source File
-# Begin Source File
-
SOURCE=.\GIOP_Message_Locate_Header.i
# End Source File
# Begin Source File
-SOURCE=.\GIOP_Message_Reactive_Handler.inl
-# End Source File
-# Begin Source File
-
-SOURCE=.\GIOP_Message_State.i
+SOURCE=.\GIOP_Message_State.inl
# End Source File
# Begin Source File
@@ -1923,6 +1915,10 @@ SOURCE=.\IIOPC.i # End Source File
# Begin Source File
+SOURCE=.\Incoming_Message_Queue.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Interceptor_List.inl
# End Source File
# Begin Source File
@@ -2103,6 +2099,10 @@ SOURCE=.\Reply_Dispatcher.i # End Source File
# Begin Source File
+SOURCE=.\Resume_Handle.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\sequence.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Server_Request.cpp b/TAO/tao/TAO_Server_Request.cpp index c17d1f68ce1..bf1b278f813 100644 --- a/TAO/tao/TAO_Server_Request.cpp +++ b/TAO/tao/TAO_Server_Request.cpp @@ -1,5 +1,6 @@ // $Id$ + // Implementation of the Dynamic Server Skeleton Interface (for GIOP) #include "TAO_Server_Request.h" diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index eff531ec2df..250689c9a37 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -1,6 +1,7 @@ // -*- C++ -*- // $Id$ + #include "Transport.h" #include "Exception.h" @@ -17,6 +18,7 @@ #include "Flushing_Strategy.h" #include "Transport_Cache_Manager.h" #include "debug.h" +#include "Resume_Handle.h" #include "ace/Message_Block.h" @@ -26,6 +28,7 @@ ACE_RCSID(tao, Transport, "$Id$") + TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount) : ACE_Refcountable (refcount) , refcount_lock_ (lock) @@ -67,6 +70,7 @@ TAO_Transport::TAO_Transport (CORBA::ULong tag, , bidirectional_flag_ (-1) , head_ (0) , tail_ (0) + , incoming_message_queue_ (orb_core) , current_deadline_ (ACE_Time_Value::zero) , flush_timer_id_ (-1) , transport_timer_ (this) @@ -121,7 +125,7 @@ TAO_Transport::~TAO_Transport (void) } int -TAO_Transport::handle_output () +TAO_Transport::handle_output (void) { if (TAO_debug_level > 4) { @@ -783,6 +787,674 @@ TAO_Transport::generate_request_header ( } int +TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, + ACE_Time_Value * max_wait_time, + int /*block*/) +{ + // First try to process messages of the head of the incoming queue. + int retval = this->process_queue_head (rh); + + if (retval <= 0) + { + if (retval == -1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) TAO::handle_input_i," + "error while parsing the head of the queue \n")); + + this->tms_->connection_closed (); + } + return retval; + } + + // If there are no messages then we can go ahead to read from the + // handle for further reading.. + + // The buffer on the stack which will be used to hold the input + // messages + char buf [TAO_CONNECTION_HANDLER_STACK_BUF_SIZE]; + +#if defined (ACE_HAS_PURIFY) + (void) ACE_OS::memset (buf, + '\0', + sizeof buf); +#endif /* ACE_HAS_PURIFY */ + + // Create a data block + ACE_Data_Block db (sizeof (buf), + ACE_Message_Block::MB_DATA, + buf, + this->orb_core_->message_block_buffer_allocator (), + this->orb_core_->locking_strategy (), + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_dblock_allocator ()); + + // Create a message block + ACE_Message_Block message_block (&db, + ACE_Message_Block::DONT_DELETE, + this->orb_core_->message_block_msgblock_allocator ()); + + + // Align the message block + ACE_CDR::mb_align (&message_block); + + + // Read the message into the message block that we have created on + // the stack. + ssize_t n = this->recv (message_block.rd_ptr (), + message_block.space (), + max_wait_time); + + // If there is an error return to the reactor.. + if (n <= 0) + { + if (n == -1) + this->tms_->connection_closed (); + + return n; + } + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) Read [%d] bytes \n", + n)); + } + + // Set the write pointer in the stack buffer + message_block.wr_ptr (n); + + // Parse the message and try consolidating the message if + // needed. + retval = this->parse_consolidate_messages (message_block, + rh, + max_wait_time); + + if (retval <= 0) + { + if (retval == -1 && TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport::handle_input_i " + "error while parsing and consolidating \n")); + } + return retval; + } + + // Make a node of the message block.. + TAO_Queued_Data qd (&message_block); + + // Extract the data for the node.. + this->messaging_object ()->get_message_data (&qd); + + // Resume before starting to process the request.. + rh.resume_handle (); + + // Process the message + return this->process_parsed_messages (&qd); +} + +int +TAO_Transport::parse_consolidate_messages (ACE_Message_Block &block, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Parse the incoming message for validity. The check needs to be + // performed by the messaging objects. + if (this->parse_incoming_messages (block) == -1) + return -1; + + // Check whether we have a complete message for processing + ssize_t missing_data = this->missing_data (block); + + if (missing_data < 0) + { + // If we have more than one message + return this->consolidate_extra_messages (block, + rh); + } + else if (missing_data > 0) + { + // If we have missing data then try doing a read or try queueing + // them. + return this->consolidate_message (block, + missing_data, + rh, + max_wait_time); + } + + return 1; +} + +int +TAO_Transport::parse_incoming_messages (ACE_Message_Block &block) +{ + // If we have a queue and if the last message is not complete a + // complete one, then this read will get us the remaining data. So + // do not try to parse the header if we have an incomplete message + // in the queue. + if (this->incoming_message_queue_.is_tail_complete () != 0) + { + // As it looks like a new message has been read, process the + // message. Call the messaging object to do the parsing.. + int retval = + this->messaging_object ()->parse_incoming_messages (block); + + if (retval == -1) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - error in incoming message \n"))); + + this->tms_->connection_closed (); + return -1; + } + } + + return 0; +} + + +size_t +TAO_Transport::missing_data (ACE_Message_Block &incoming) +{ + // If we have a incomplete message in the queue then find out how + // much of data is required to get a complete message + if (this->incoming_message_queue_.is_tail_complete () == 0) + { + return this->incoming_message_queue_.missing_data_tail (); + } + + return this->messaging_object ()->missing_data (incoming); +} + + +int +TAO_Transport::consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + // Check whether the last message in the queue is complete.. + if (this->incoming_message_queue_.is_tail_complete () == 0) + return this->consolidate_message_queue (incoming, + missing_data, + rh, + max_wait_time); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n", + this->id ())); + } + + // Calculate the actual length of the load that we are supposed to + // read which is equal to the <missing_data> + length of the buffer + // that we have.. + size_t payload = missing_data + incoming.length (); + + // Grow the buffer to the size of the message + ACE_CDR::grow (&incoming, + payload); + + // .. do a read on the socket again. + ssize_t n = this->recv (incoming.wr_ptr (), + missing_data, + max_wait_time); + + // If we got an EWOULDBLOCK or some other error.. + if (n <= 0) + { + if (n == -1) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Trasport::consolidate_message," + "error while trying to consolidate \n")); + } + this->tms_->connection_closed (); + } + + return n; + } + + // Move the write pointer + incoming.wr_ptr (n); + + // ..Decrement + missing_data -= n; + + if (missing_data > 0) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" + "insufficient read, queueing up the message \n", + this->id ())); + } + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); + + // Add the missing data to the queue + qd->missing_data_ = missing_data; + + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = incoming.duplicate (); + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + return 0; + } + + // Check to see if we have messages in queue. AT this point we + // cannot have have semi-complete messages in the queue as they + // would have been taken care before + if (this->incoming_message_queue_.queue_length ()) + { + // If we have messages in the queue, put the <incoming> in the + // queue + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message \n" + " queueing up the message \n", + this->id ())); + } + + // Get an instance of TAO_Queued_Data + TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data (); + + // Duplicate the data block before putting it in the queue. + qd->msg_block_ = incoming.duplicate (); + + // Get the rest of the messaging data + this->messaging_object ()->get_message_data (qd); + + // Add it to the tail of the queue.. + this->incoming_message_queue_.enqueue_tail (qd); + + // Process one on the head of the queue and return + return this->process_queue_head (rh); + } + + + // We dont have any missing data. Just make a queued_data node with + // the existing message block and send it to the higher layers of + // the ORB. + TAO_Queued_Data pqd (&incoming); + pqd.missing_data_ = missing_data; + this->messaging_object ()->get_message_data (&pqd); + + // Resume the handle before processing the request + rh.resume_handle (); + + // Now we have a full message in our buffer. Just go ahead and + // process that + return this->process_parsed_messages (&pqd); +} + +int +TAO_Transport::consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_message_queue \n", + this->id ())); + } + + // If the queue did not have a complete message put this piece of + // message in the queue. We kow it did not have a complete + // message. That is why we are here. + size_t n = this->incoming_message_queue_.copy_tail (incoming); + + // Update the missing data... + missing_data = this->incoming_message_queue_.missing_data_tail (); + + // Move the read pointer of the <incoming> message block to the end + // of the copied message and process the remaining portion... + incoming.rd_ptr (n); + + // If we have some more information left in the message block.. + if (incoming.length ()) + { + // We may have to parse & consolidate. This part of the message + // doesn't seem to be part of the last message in the queue (as + // the copy () hasn't taken away this message). + int retval = this->parse_consolidate_messages (incoming, + rh, + max_wait_time); + + // If there is an error return + if (retval == -1) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Error while consolidating... \n", + "TAO (%P|%t) - .. part of the read message \n")); + } + return retval; + } + + // parse_consolidate_messages () would have processed one of the + // messages, so we better return as we dont want to starve other + // threads. + return 0; + } + + // If we still have some missing data.. + if (missing_data > 0) + { + // Get the last message from the Queue + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_tail (); + + // Try to do a read again. If we have some luck it would be + // great.. + ssize_t n = this->recv (qd->msg_block_->wr_ptr (), + missing_data, + max_wait_time); + + // Error... + if (n <= 0) + return n; + + // Move the write pointer + qd->msg_block_->wr_ptr (n); + + // Decrement the missing data + qd->missing_data_ -= n; + + // Now put the TAO_Queued_Data back in the queue + this->incoming_message_queue_.enqueue_tail (qd); + } + + // Process a message in the head of the queue if we have one.. + return this->process_queue_head (rh); +} + + +int +TAO_Transport::consolidate_extra_messages (ACE_Message_Block + &incoming, + TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n", + this->id ())); + } + + // Pick the tail of the queue + TAO_Queued_Data *tail = + this->incoming_message_queue_.dequeue_tail (); + + if (tail) + { + // If we have a node in the tail, checek to see whether it needs + // consolidation. If so, just consolidate it. + if (this->messaging_object ()->consolidate_node (tail, + incoming) == -1) + return -1; + + // .. put the tail back in queue.. + this->incoming_message_queue_.enqueue_tail (tail); + } + + int retval = 1; + + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::consolidate_extra_messages \n" + "..............extracting extra messages \n", + this->id ())); + } + + // Extract messages.. + while (retval == 1) + { + TAO_Queued_Data *q_data = 0; + + retval = + this->messaging_object ()->extract_next_message (incoming, + q_data); + if (q_data) + this->incoming_message_queue_.enqueue_tail (q_data); + } + + // In case of error return.. + if (retval == -1) + { + return retval; + } + + return this->process_queue_head (rh); +} + +int +TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd) +{ + // Get the <message_type> that we have received + TAO_Pluggable_Message_Type t = qd->msg_type_; + + int result = 0; + if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("Close Connection Message recd \n"))); + + // Close the TMS + this->tms_->connection_closed (); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST) + { + if (this->messaging_object ()->process_request_message ( + this, + qd) == -1) + { + // Close the TMS + this->tms_->connection_closed (); + + // Return a "-1" so that the next stage can take care of + // closing connection and the necessary memory management. + return -1; + } + } + else if (t == TAO_PLUGGABLE_MESSAGE_REPLY) + { + // @@todo: Maybe the input_cdr can be constructed from the + // message_block + TAO_Pluggable_Reply_Params params (this->orb_core ()); + + if (this->messaging_object ()->process_reply_message (params, + qd) == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - %p\n"), + ACE_TEXT ("IIOP_Transport::process_message, ") + ACE_TEXT ("process_reply_message ()"))); + + this->messaging_object ()->reset (); + this->tms_->connection_closed (); + return -1; + } + + result = this->tms ()->dispatch_reply (params); + + // @@ Somehow it seems dangerous to reset the state *after* + // dispatching the request, what if another threads receives + // another reply in the same connection? + // My guess is that it works as follows: + // - For the exclusive case there can be no such thread. + // - The the muxed case each thread has its own message_state. + // I'm pretty sure this comment is right. Could somebody else + // please look at it and confirm my guess? + + // @@ The above comment was found in the older versions of the + // code. The code was also written in such a way that, when + // the client thread on a call from handle_input () from the + // reactor a call would be made on the handle_client_input + // (). The implementation of handle_client_input () looked so + // flaky. It used to create a message state upon entry in to + // the function using the TMS and destroy that on exit. All + // this was fine _theoretically_ for multiple threads. But + // the flakiness was originating in the implementation of + // get_message_state () where we were creating message state + // only once and dishing it out for every thread till one of + // them destroy's it. So, it looked broken. That has been + // changed. Why?. To my knowledge, the reactor does not call + // handle_input () on two threads at the same time. So, IMHO + // that defeats the purpose of creating a message state for + // every thread. This is just my guess. If we run in to + // problems this place needs to be revisited. If someone else + // is going to take a look please contact bala@cs.wustl.edu + // for details on this-- Bala + + if (result == -1) + { + // Something really critical happened, we will forget about + // every reply on this connection. + if (TAO_debug_level > 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) : IIOP_Transport::") + ACE_TEXT ("process_message - ") + ACE_TEXT ("dispatch reply failed\n"))); + + this->messaging_object ()->reset (); + this->tms_->connection_closed (); + return -1; + } + + if (result == 0) + { + + this->messaging_object ()->reset (); + + // The reply dispatcher was no longer registered. + // This can happened when the request/reply + // times out. + // To throw away all registered reply handlers is + // not the right thing, as there might be just one + // old reply coming in and several valid new ones + // pending. If we would invoke <connection_closed> + // we would throw away also the valid ones. + //return 0; + } + + + // This is a NOOP for the Exclusive request case, but it actually + // destroys the stream in the muxed case. + //this->tms_->destroy_message_state (message_state); + } + else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR) + { + return -1; + } + + // If not, just return back.. + return 0; +} + +int +TAO_Transport::process_queue_head (TAO_Resume_Handle &rh) +{ + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::process_queue_head \n", + this->id ())); + } + + // See if the message in the head of the queue is complete... + if (this->incoming_message_queue_.is_head_complete () == 1) + { + // Get the message on the head of the queue.. + TAO_Queued_Data *qd = + this->incoming_message_queue_.dequeue_head (); + + // Now that we have pulled out out one message out of the queue, + // check whether we have one more message in the queue... + if (this->incoming_message_queue_.is_head_complete () == 1) + { + // Get the event handler.. + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + + // Get the reactor associated with the event handler + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::notify to Reactor\n", + this->id ())); + } + + // Let the class know that it doesn't need to resume the + // handle.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED); + + // Send a notification to the reactor... + int retval = reactor->notify (eh, + ACE_Event_Handler::READ_MASK); + + if (retval < 0 && TAO_debug_level > 2) + { + // @@todo: need to think about what is the action that + // we can take when we get here. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport::process_queue_head ") + ACE_TEXT ("notify to the reactor failed.. \n"))); + } + + } + else + { + // As we are ready to process the last message just resume + // the handle. Set the flag incase someone had reset the flag.. + rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE); + rh.resume_handle (); + } + + // Process the message... + if (this->process_parsed_messages (qd) == -1) + return -1; + + // Delete the Queued_Data.. + TAO_Queued_Data::release (qd); + + return 0; + } + + return 1; +} + + +int TAO_Transport::queue_is_empty (void) { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index 54f8539ae1d..99c0c4512e9 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -19,15 +19,18 @@ #include "ace/pre.h" #include "corbafwd.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + #include "Exception.h" #include "Transport_Descriptor_Interface.h" #include "Transport_Cache_Manager.h" #include "Transport_Timer.h" #include "ace/Strategies.h" - -#if !defined (ACE_LACKS_PRAGMA_ONCE) -# pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "Incoming_Message_Queue.h" class TAO_ORB_Core; class TAO_Target_Specification; @@ -38,6 +41,7 @@ class TAO_Connection_Handler; class TAO_Pluggable_Messaging; class TAO_Queued_Message; +class TAO_Resume_Handle; class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable { @@ -72,7 +76,7 @@ protected: * * <H3>The outgoing data path:</H3> * - * One of the responsabilities of the TAO_Transport class is to send + * One of the responsibilities of the TAO_Transport class is to send * out GIOP messages as efficiently as possible. In most cases * messages are put out in FIFO order, the transport object will put * out the message using a single system call and return control to @@ -129,15 +133,71 @@ protected: * - A per-transport 'send strategy' to choose between blocking on * write, blocking on the reactor or blockin on leader/follower. * - A per-message 'waiting object' - * - A per-meessage timeout + * - A per-message timeout * * The Transport object provides a single method to send messages * (send_message ()). * * <H3>The incoming data path:</H3> * - * @todo Document the incoming data path design forces. + * One of the main responsibilities of the transport is to read and + * process the incoming GIOP message as quickly and efficiently as + * possible. There are other forces that needs to be given due + * consideration. They are + * - Multiple threads should be able to tarverse along the same data + * path but should not be able to read from the same handle at the + * same time ie. the handle should not be shared between threads at + * any instant. + * - Reads on the handle could give one or more messages. + * - Minimise locking and copying overhead when trying to attack the + * above. + * + * <H3> Parsing messages (GIOP) & processing the message:</H3> + * + * The messages should be checked for validity and the right + * information should be sent to the higher layer for processing. The + * process of doing a sanity check and preparing the messages for the + * higher layers of the ORB are done by the messaging protocol. + * + * <H3> Design forces and Challenges </H3> + * + * To keep things as efficient as possible for medium sized requests, + * it would be good to minimise data copying and locking along the + * incoming path ie. from the time of reading the data from the handle + * to the application. We achieve this by creating a buffer on stack + * and reading the data from the handle into the buffer. We then pass + * the same data block (the buffer is encapsulated into a data block) + * to the higher layers of the ORB. The problems stem from the + * following + * (a) Data is bigger than the buffer that we have on stack + * (b) Transports like TCP do not guarentee availability of the whole + * chunk of data in one shot. Data could trickle in byte by byte. + * (c) Single read gives multiple messages + * + * We solve the problems as follows + * + * (a) First do a read with the buffer on stack. Query the underlying + * messaging object whether the message has any incomplete + * portion. If so, we just grow the buffer for the missing size + * and read the rest of the message. We free the handle and then + * send the message to the higher layers of the ORB for + * processing. * + * (b) If we block (ie. if we receive a EWOULDBLOCK) while trying to + * do the above (ie. trying to read after growing the buffer + * size) we put the message in a queue and return back to the + * reactor. The reactor would call us back when the handle + * becomes read ready. + * + * (c) If we get multiple messages (possible if the client connected + * to the server sends oneways or AMI requests), we parse and + * split the messages. Every message is put in the queue. Once + * the messages are queued, the thread picks up one message to + * send to the higher layers of the ORB. Before doing that, if + * it finds more messages, it sends a notify to the reactor + * without resuming the handle. The next thread picks up a + * message from the queue and processes that. Once the queue + * is drained the last thread resumes the handle. * * <B>See Also:</B> * @@ -451,6 +511,30 @@ public: TAO_Target_Specification &spec, TAO_OutputCDR &msg); + /// Callback to read incoming data + /** + * The ACE_Event_Handler adapter invokes this method as part of its + * handle_input() operation. + * + * @todo: the method name is confusing! Calling it handle_input() + * would probably make things easier to understand and follow! + * + * Once a complete message is read the Transport class delegates on + * the Messaging layer to invoke the right upcall (on the server) or + * the TAO_Reply_Dispatcher (on the client side). + * + * @param max_wait_time In some cases the I/O is synchronous, e.g. a + * thread-per-connection server or when Wait_On_Read is enabled. In + * those cases a maximum read time can be specified. + * + * @param block Is deprecated and ignored. + * + */ + virtual int handle_input_i (TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time = 0, + int block = 0); + + /// Prepare the waiting and demuxing strategy to receive a reply for /// a new request. /** @@ -501,29 +585,6 @@ public: int is_synchronous = 1, ACE_Time_Value *max_time_wait = 0) = 0; - /// Callback to read incoming data - /** - * The ACE_Event_Handler adapter invokes this method as part of its - * handle_input() operation. - * - * @todo: the method name is confusing! Calling it handle_input() - * would probably make things easier to understand and follow! - * - * Once a complete message is read the Transport class delegates on - * the Messaging layer to invoke the right upcall (on the server) or - * the TAO_Reply_Dispatcher (on the client side). - * - * @param max_wait_time In some cases the I/O is synchronous, e.g. a - * thread-per-connection server or when Wait_On_Read is enabled. In - * those cases a maximum read time can be specified. - * - * @param block Is deprecated and ignored. - * - */ - // @@ lockme - virtual int read_process_message (ACE_Time_Value *max_wait_time = 0, - int block = 0) = 0; - protected: /// Register the handler with the reactor. /** @@ -548,6 +609,53 @@ protected: */ virtual void transition_handler_state_i (void) = 0; + + /// Called by the handle_input_i (). This method is used to parse + /// message read by the handle_input_i () call. It also decides + /// whether the message needs consolidation before processing. + int parse_consolidate_messages (ACE_Message_Block &bl, + TAO_Resume_Handle &rh, + ACE_Time_Value *time = 0); + + + /// Method does parsing of the message if we have a fresh message in + /// the <message_block> or just returns if we have read part of the + /// previously stored message. + int parse_incoming_messages (ACE_Message_Block &message_block); + + /// Return if we have any missing data in the queue of messages + /// or determine if we have more information left out in the + /// presently read message to make it complete. + size_t missing_data (ACE_Message_Block &message_block); + + /// Consolidate the currently read message or consolidate the last + /// message in the queue. The consolidation of the last message in + /// the queue is done by calling consolidate_message_queue (). + virtual int consolidate_message (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); + + /// First consolidate the message queue. If the message is still not + /// complete, try to read from the handle again to make it + /// complete. If these dont help put the message back in the queue + /// and try to check the queue if we have message to process. (the + /// thread needs to do some work anyway :-)) + int consolidate_message_queue (ACE_Message_Block &incoming, + ssize_t missing_data, + TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time); + + /// Called by parse_consolidate_message () if we have more messages + /// in one read. Queue up the messages and try to process one of + /// them, atleast at the head of them. + int consolidate_extra_messages (ACE_Message_Block &incoming, + TAO_Resume_Handle &rh); + + /// Process the message by sending it to the higher layers of the + /// ORB. + int process_parsed_messages (TAO_Queued_Data *qd); + public: /// Method for the connection handler to signify that it /// is being closed and destroyed. @@ -700,6 +808,11 @@ private: /// Print out error messages if the event handler is not valid void report_invalid_event_handler (const char *caller); + /// Process the message that is in the head of the incoming queue. + /// If there are more messages in the queue, this method sends a + /// notify () to the reactor to send a next thread along. + int process_queue_head (TAO_Resume_Handle &rh); + /// Prohibited ACE_UNIMPLEMENTED_FUNC (TAO_Transport (const TAO_Transport&)) ACE_UNIMPLEMENTED_FUNC (void operator= (const TAO_Transport&)) @@ -747,6 +860,9 @@ protected: TAO_Queued_Message *head_; TAO_Queued_Message *tail_; + /// Queue of the incoming messages.. + TAO_Incoming_Message_Queue incoming_message_queue_; + /// The queue will start draining no later than <queing_deadline_> /// *if* the deadline is ACE_Time_Value current_deadline_; diff --git a/TAO/tao/Transport_Mux_Strategy.h b/TAO/tao/Transport_Mux_Strategy.h index 447f40ce344..5937ff16e9a 100644 --- a/TAO/tao/Transport_Mux_Strategy.h +++ b/TAO/tao/Transport_Mux_Strategy.h @@ -68,16 +68,6 @@ public: /// allocated for that request. virtual int dispatch_reply (TAO_Pluggable_Reply_Params ¶ms) = 0; - // = "Factory methods" to obtain the CDR stream, in the Muxed case - // the factory simply allocates a new one, in the Exclusive case - // the factory returns a pointer to the pre-allocated CDR. - - // virtual TAO_GIOP_Message_State *get_message_state (void) = 0; - // Get a CDR stream. - - // virtual void destroy_message_state (TAO_GIOP_Message_State *) = 0; - // Destroy a CDR stream. - /// Request has been just sent, but the reply is not received. Idle /// the transport now. virtual int idle_after_send (void) = 0; diff --git a/TAO/tao/Wait_On_Read.cpp b/TAO/tao/Wait_On_Read.cpp index 3320298e49d..29e983a90db 100644 --- a/TAO/tao/Wait_On_Read.cpp +++ b/TAO/tao/Wait_On_Read.cpp @@ -2,6 +2,7 @@ #include "tao/Wait_On_Read.h" #include "Transport.h" +#include "Resume_Handle.h" ACE_RCSID(tao, Wait_On_Read, "$Id$") @@ -26,10 +27,13 @@ TAO_Wait_On_Read::wait (ACE_Time_Value * max_wait_time, // Do the same sort of looping that is done in other wait // strategies. int retval = 0; + TAO_Resume_Handle rh; while (1) { retval = - this->transport_->read_process_message (max_wait_time, 1); + this->transport_->handle_input_i (rh, + max_wait_time, + 1); // If we got our reply, no need to run the loop any // further. |