diff options
-rw-r--r-- | TAO/ChangeLog | 23 | ||||
-rw-r--r-- | TAO/tao/Incoming_Message_Queue.h | 2 | ||||
-rw-r--r-- | TAO/tao/Synch_Refcountable.cpp | 22 | ||||
-rw-r--r-- | TAO/tao/Synch_Refcountable.h | 57 | ||||
-rw-r--r-- | TAO/tao/Synch_Refcountable.inl | 21 | ||||
-rw-r--r-- | TAO/tao/TAO.dsp | 12 | ||||
-rw-r--r-- | TAO/tao/TAO_Static.dsp | 12 | ||||
-rw-r--r-- | TAO/tao/Transport.cpp | 1538 | ||||
-rw-r--r-- | TAO/tao/Transport.h | 144 | ||||
-rw-r--r-- | TAO/tao/Transport.inl | 11 |
10 files changed, 998 insertions, 844 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 4e41d89997d..bff36d486fe 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,26 @@ +Sun Jul 07 21:56:23 2002 Balachandran Natarajan <bala@cs.wustl.edu> + + * tao/Transport.h: + * tao/Transport.cpp: + * tao/Transport.inl: Did a few cosmetic changes. + + - Removed the TAO_Synch_Refcountable class and moved that to a + new file. + - Rearranged the methods in cpp and inl files, so that the + methods specific for the input path and output path are grouped + together. Just did this to improve readability. + - Added a new method by name transport_cache_manager () which + returns the reference to transport cache manager from the ORB. + + * tao/Synch_Refcountable.{h,cpp,inl}: Added this new file which + contains the class TAO_Synch_Refcountable. + + * tao/TAO.dsp: + * tao/TAO_Static.dsp: Added the new files to the project + workspace. + + * tao/Incoming_Mesage_Queue.h: Fixed an erroneous comment. + Sun Jul 7 18:53:54 2002 Jeff Parsons <parsons@cs.wustl.edu> * TAO_IDL/fe/fe_tmplinst.cpp: diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h index 23a11f021f9..52d3ea50d34 100644 --- a/TAO/tao/Incoming_Message_Queue.h +++ b/TAO/tao/Incoming_Message_Queue.h @@ -97,7 +97,7 @@ private: private: - /// A circular linked listof messages that await processing + /// A linked listof messages that await processing TAO_Queued_Data *queued_data_; /// The size of the queue diff --git a/TAO/tao/Synch_Refcountable.cpp b/TAO/tao/Synch_Refcountable.cpp new file mode 100644 index 00000000000..4bd6b1fc0ab --- /dev/null +++ b/TAO/tao/Synch_Refcountable.cpp @@ -0,0 +1,22 @@ +#include "Synch_Refcountable.h" + + +#if !defined (__ACE_INLINE__) +# include "Synch_Refcountable.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID (tao, + Synch_Refcountable, + "$Id$") + +TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, + int refcount) + : ACE_Refcountable (refcount) + , refcount_lock_ (lock) +{ +} + +TAO_Synch_Refcountable::~TAO_Synch_Refcountable (void) +{ + delete this->refcount_lock_; +} diff --git a/TAO/tao/Synch_Refcountable.h b/TAO/tao/Synch_Refcountable.h new file mode 100644 index 00000000000..534873d255f --- /dev/null +++ b/TAO/tao/Synch_Refcountable.h @@ -0,0 +1,57 @@ +// This may look like C, but it's really +// -*- C++ -*- + +//============================================================================= +/** + * @file Synch_Refcountable.h + * + * $Id$ + * + * Definition for a synchronised refcountable interface. + * + * @author Fred Kuhns <fredk@cs.wustl.edu> + */ +//============================================================================= +#ifndef TAO_SYNCH_REFCOUNTABLE_H +#define TAO_SYNCH_REFCOUNTABLE_H +#include "ace/pre.h" + +#include "ace/Refcountable.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "TAO_Export.h" +#include "ace/Synch.h" + +class ACE_Lock; + +/** + * @class TAO_Synch_Refcountable + * + * @brief Definition for a synchronised refcountable interface. + */ +class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable +{ +public: + virtual ~TAO_Synch_Refcountable (void); + + int increment (void); + int decrement (void); + + int refcount (void) const; + +protected: + TAO_Synch_Refcountable (ACE_Lock *lock, int refcount); + + ACE_Lock *refcount_lock_; +}; + + +#if defined (__ACE_INLINE__) +# include "Synch_Refcountable.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /*TAO_SYNCH_REFCOUNTABLE*/ diff --git a/TAO/tao/Synch_Refcountable.inl b/TAO/tao/Synch_Refcountable.inl new file mode 100644 index 00000000000..f933f1fd5ff --- /dev/null +++ b/TAO/tao/Synch_Refcountable.inl @@ -0,0 +1,21 @@ +// -*- C++ -*- +//$Id$ +ACE_INLINE int +TAO_Synch_Refcountable::increment (void) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0); + return ACE_Refcountable::increment (); +} + +ACE_INLINE int +TAO_Synch_Refcountable::decrement (void) +{ + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0); + return ACE_Refcountable::decrement (); +} + +ACE_INLINE int +TAO_Synch_Refcountable::refcount (void) const +{ + return ACE_Refcountable::refcount (); +} diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp index 3f364f0a467..5c879afd067 100644 --- a/TAO/tao/TAO.dsp +++ b/TAO/tao/TAO.dsp @@ -831,6 +831,10 @@ SOURCE=.\Synch_Queued_Message.cpp # End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -1659,6 +1663,10 @@ SOURCE=.\Synch_Queued_Message.h # End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -2263,6 +2271,10 @@ SOURCE=.\Sync_Strategies.i # End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Tagged_Components.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp index 21d51b8e0d2..538caf636e8 100644 --- a/TAO/tao/TAO_Static.dsp +++ b/TAO/tao/TAO_Static.dsp @@ -763,6 +763,10 @@ SOURCE=.\Synch_Queued_Message.cpp # End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -1643,6 +1647,10 @@ SOURCE=.\Synch_Queued_Message.h # End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -2203,6 +2211,10 @@ SOURCE=.\Sync_Strategies.i # End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Tagged_Components.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index e3b0a611cf1..68ab8203f9a 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -31,38 +31,68 @@ ACE_RCSID (tao, Transport, "$Id$") - -TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount) - : ACE_Refcountable (refcount) - , refcount_lock_ (lock) +/* + * Static function in file scope + */ +static void +dump_iov (iovec *iov, int iovcnt, int id, + size_t current_transfer, + const char *location) { -} + ACE_Log_Msg::instance ()->acquire (); -TAO_Synch_Refcountable::~TAO_Synch_Refcountable (void) -{ - delete this->refcount_lock_; -} + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::%s, " + "sending %d buffers\n", + id, location, iovcnt)); + for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) + { + size_t iov_len = iov[i].iov_len; -int -TAO_Synch_Refcountable::increment (void) -{ - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0); - return ACE_Refcountable::increment (); -} + // Possibly a partially sent iovec entry. + if (current_transfer < iov_len) + iov_len = current_transfer; -int -TAO_Synch_Refcountable::decrement (void) -{ - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0); - return ACE_Refcountable::decrement (); -} + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::%s, " + "buffer %d/%d has %d bytes\n", + id, location, + i, iovcnt, + iov_len)); -int -TAO_Synch_Refcountable::refcount (void) const -{ - return ACE_Refcountable::refcount (); + size_t len; + for (size_t offset = 0; offset < iov_len; offset += len) + { + ACE_TCHAR header[1024]; + ACE_OS::sprintf (header, + "TAO - Transport[%d]::%s (" + ACE_SIZE_T_FORMAT_SPECIFIER "/" + ACE_SIZE_T_FORMAT_SPECIFIER ")\n", + id, location, offset, iov_len); + + len = iov_len - offset; + if (len > 512) + len = 512; + ACE_HEX_DUMP ((LM_DEBUG, + ACE_static_cast(char*,iov[i].iov_base) + offset, + len, + header)); + } + current_transfer -= iov_len; + } + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::%s, " + "end of data\n", + id, location)); + + ACE_Log_Msg::instance ()->release (); } +/* + * Definitions for methods declared in the transport class + * + */ + // Constructor. TAO_Transport::TAO_Transport (CORBA::ULong tag, TAO_ORB_Core *orb_core) @@ -116,39 +146,49 @@ TAO_Transport::~TAO_Transport (void) // outside the TAO_Transport. if (this->cache_map_entry_ != 0) { - this->orb_core_->lane_resources ().transport_cache ().purge_entry ( - this->cache_map_entry_); + this->transport_cache_manager ().purge_entry (this->cache_map_entry_); } } -int -TAO_Transport::handle_output (void) + +/* + * + * Public utility methods that are called by other classes. + * + */ +/*static*/ TAO_Transport* +TAO_Transport::_duplicate (TAO_Transport* transport) { - if (TAO_debug_level > 3) + if (transport != 0) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_output\n", - this->id ())); + transport->increment (); } + return transport; +} - // The flushing strategy (potentially via the Reactor) wants to send - // more data, first check if there is a current message that needs - // more sending... - int retval = this->drain_queue (); - - if (TAO_debug_level > 3) +/*static*/ void +TAO_Transport::release (TAO_Transport* transport) +{ + if (transport != 0) { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::handle_output, " - "drain_queue returns %d/%d\n", - this->id (), - retval, errno)); - } + int count = transport->decrement (); - // Any errors are returned directly to the Reactor - return retval; + if (count == 0) + { + delete transport; + } + else if (count < 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport[%d]::release, " + "reference count is less than zero: %d\n", + transport->id (), count)); + ACE_OS::abort (); + } + } } + void TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered, TAO_EventHandlerSet &unregistered) @@ -157,7 +197,6 @@ TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered, guard, *this->handler_lock_)); ACE_Event_Handler *eh = this->event_handler_i (); - // TAO_Connection_Handler *ch = ACE_reinterpret_cast (TAO_Connection_Handler *, eh); if (eh != 0) { @@ -172,60 +211,216 @@ TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered, } } -static void -dump_iov (iovec *iov, int iovcnt, int id, - size_t current_transfer, - const char *location) + +int +TAO_Transport::register_handler (void) { - ACE_Log_Msg::instance ()->acquire (); + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->handler_lock_, + -1)); + if (this->check_event_handler_i ("Transport::register_handler") == -1) + return -1; - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::%s, " - "sending %d buffers\n", - id, location, iovcnt)); - for (int i = 0; i != iovcnt && 0 < current_transfer; ++i) + return this->register_handler_i (); +} + + +ssize_t +TAO_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->handler_lock_, + -1)); + + if (this->check_event_handler_i ("Transport::send") == -1) + return -1; + + // now call the template method + return this->send_i (iov, iovcnt, bytes_transferred, timeout); +} + + +ssize_t +TAO_Transport::recv (char *buffer, + size_t len, + const ACE_Time_Value *timeout) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, + guard, + *this->handler_lock_, + -1)); + + if (this->check_event_handler_i ("Transport::recv") == -1) + return -1; + + // now call the template method + return this->recv_i (buffer, len, timeout); +} + + +int +TAO_Transport::idle_after_send (void) +{ + return this->tms ()->idle_after_send (); +} + +int +TAO_Transport::idle_after_reply (void) +{ + return this->tms ()->idle_after_reply (); +} + +int +TAO_Transport::tear_listen_point_list (TAO_InputCDR &) +{ + ACE_NOTSUP_RETURN (-1); +} + +void +TAO_Transport::close_connection (void) +{ + ACE_Event_Handler * eh = this->invalidate_event_handler (); + this->close_connection_shared (1, eh); +} + +int +TAO_Transport::generate_locate_request ( + TAO_Target_Specification &spec, + TAO_Operation_Details &opdetails, + TAO_OutputCDR &output) +{ + if (this->messaging_object ()->generate_locate_request_header (opdetails, + spec, + output) == -1) { - size_t iov_len = iov[i].iov_len; + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::generate_locate_request, " + "error while marshalling the LocateRequest header\n", + this->id ())); + + return -1; + } + + return 0; +} + + +int +TAO_Transport::generate_request_header ( + TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &output) +{ + if (this->messaging_object ()->generate_request_header (opdetails, + spec, + output) == -1) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) - Transport[%d]::generate_request_header, " + "error while marshalling the Request header\n", + this->id())); + + return -1; + } + + return 0; +} + + +/* + * NOTE: Some of these calls looks like ideal fodder for + * inlining. But, please note that the calls made within the method + * are not inlined. This would increase closure cost on the + * compiler. + */ +void +TAO_Transport::connection_handler_closing (void) +{ + // The connection has closed, we must invalidate the handler to + // ensure that any attempt to use this transport results in an + // error. Basically all the other methods in the Transport + // cooperate via check_event_handler_i() + + (void) this->invalidate_event_handler (); + this->send_connection_closed_notifications (); + + // Can't hold the lock while we release, b/c the release could + // invoke the destructor! This should be the last thing we do here + TAO_Transport::release (this); +} + +int +TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc) +{ + // First purge our entry + this->transport_cache_manager ().purge_entry (this->cache_map_entry_); + + // Then add ourselves to the cache + return this->transport_cache_manager ().cache_transport (desc, + this); +} + +void +TAO_Transport::purge_entry (void) +{ + if (this->cache_map_entry_ != 0) + { + (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_); + } +} + +int +TAO_Transport::make_idle (void) +{ + if (this->cache_map_entry_ != 0) + { + return this->transport_cache_manager ().make_idle (this->cache_map_entry_); + } + + return -1; +} - // Possibly a partially sent iovec entry. - if (current_transfer < iov_len) - iov_len = current_transfer; +/* + * + * Methods called and used in the output path of the ORB. + * + */ +int +TAO_Transport::handle_output (void) +{ + if (TAO_debug_level > 3) + { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::%s, " - "buffer %d/%d has %d bytes\n", - id, location, - i, iovcnt, - iov_len)); + "TAO (%P|%t) - Transport[%d]::handle_output\n", + this->id ())); + } - size_t len; - for (size_t offset = 0; offset < iov_len; offset += len) - { - ACE_TCHAR header[1024]; - ACE_OS::sprintf (header, - "TAO - Transport[%d]::%s (" - ACE_SIZE_T_FORMAT_SPECIFIER "/" - ACE_SIZE_T_FORMAT_SPECIFIER ")\n", - id, location, offset, iov_len); + // The flushing strategy (potentially via the Reactor) wants to send + // more data, first check if there is a current message that needs + // more sending... + int retval = this->drain_queue (); - len = iov_len - offset; - if (len > 512) - len = 512; - ACE_HEX_DUMP ((LM_DEBUG, - ACE_static_cast(char*,iov[i].iov_base) + offset, - len, - header)); - } - current_transfer -= iov_len; + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::handle_output, " + "drain_queue returns %d/%d\n", + this->id (), + retval, errno)); } - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::%s, " - "end of data\n", - id, location)); - ACE_Log_Msg::instance ()->release (); + // Any errors are returned directly to the Reactor + return retval; } + int TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb, size_t &bytes_transferred, @@ -412,238 +607,582 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, return 1; } + + + +void +TAO_Transport::close_connection_i (void) +{ + ACE_Event_Handler * eh = this->invalidate_event_handler_i (); + this->close_connection_shared (1, eh); +} + +void +TAO_Transport::close_connection_no_purge (void) +{ + ACE_Event_Handler * eh = this->invalidate_event_handler (); + this->close_connection_shared (0, eh); +} + + +void +TAO_Transport::close_connection_shared (int disable_purge, + ACE_Event_Handler * eh) +{ + // Purge the entry + if (!disable_purge && this->cache_map_entry_ != 0) + { + this->transport_cache_manager ().purge_entry (this->cache_map_entry_); + } + + if (eh == 0) + { + // The connection was already closed + return; + } + + // We first try to remove the handler from the reactor. After that + // we destroy the handler using handle_close (). The remove handler + // is necessary because if the handle_closed is called directly, the + // reactor would be left with a dangling pointer. + if (this->ws_->is_registered ()) + { + this->orb_core_->reactor ()->remove_handler ( + eh, + ACE_Event_Handler::ALL_EVENTS_MASK | + ACE_Event_Handler::DONT_CALL); + } + + (void) eh->handle_close (ACE_INVALID_HANDLE, + ACE_Event_Handler::ALL_EVENTS_MASK); + + this->send_connection_closed_notifications (); +} + + + + + + int -TAO_Transport::idle_after_send (void) +TAO_Transport::queue_is_empty_i (void) { - return this->tms ()->idle_after_send (); + return (this->head_ == 0); } + + + int -TAO_Transport::idle_after_reply (void) +TAO_Transport::schedule_output_i (void) { - return this->tms ()->idle_after_reply (); + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::schedule_output\n", + this->id ())); + } + + return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); } int -TAO_Transport::tear_listen_point_list (TAO_InputCDR &) +TAO_Transport::cancel_output_i (void) { - ACE_NOTSUP_RETURN (-1); + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh == 0) + return -1; + + ACE_Reactor *reactor = eh->reactor (); + if (reactor == 0) + return -1; + + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::cancel_output\n", + this->id ())); + } + + return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); } -void -TAO_Transport::connection_handler_closing (void) +int +TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, + const void *act) { - // The connection has closed, we must invalidate the handler to - // ensure that any attempt to use this transport results in an - // error. Basically all the other methods in the Transport - // cooperate via check_event_handler_i() + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, " + "timer expired\n", + this->id ())); + } - (void) this->invalidate_event_handler (); - this->send_connection_closed_notifications (); + /// This is the only legal ACT in the current configuration.... + if (act != &this->current_deadline_) + return -1; - // Can't hold the lock while we release, b/c the release could - // invoke the destructor! This should be the last thing we do here - TAO_Transport::release (this); + if (this->flush_timer_pending ()) + { + // The timer is always a oneshot timer, so mark is as not + // pending. + this->reset_flush_timer (); + + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + (void) flushing_strategy->schedule_output (this); + } + return 0; } -TAO_Transport* -TAO_Transport::_duplicate (TAO_Transport* transport) +int +TAO_Transport::drain_queue (void) { - if (transport != 0) + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + int retval = this->drain_queue_i (); + + if (retval == 1) { - transport->increment (); + // ... there is no current message or it was completely + // sent, cancel output... + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); + + flushing_strategy->cancel_output (this); + + return 0; } - return transport; + + return retval; } -void -TAO_Transport::release (TAO_Transport* transport) +int +TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) { - if (transport != 0) + if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1) + return -1; + + size_t byte_count = 0; + + // ... send the message ... + ssize_t retval = + this->send_i (iov, iovcnt, byte_count); + + if (TAO_debug_level == 5) { - int count = transport->decrement (); + dump_iov (iov, iovcnt, this->id (), + byte_count, "drain_queue_helper"); + } - if (count == 0) + // ... now we need to update the queue, removing elements + // that have been sent, and updating the last element if it + // was only partially sent ... + this->cleanup_queue (byte_count); + iovcnt = 0; + + if (retval == 0) + { + if (TAO_debug_level > 4) { - delete transport; + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "send() returns 0", + this->id ())); } - else if (count < 0) + return -1; + } + else if (retval == -1) + { + if (TAO_debug_level > 4) { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport[%d]::release, " - "reference count is less than zero: %d\n", - transport->id (), count)); - ACE_OS::abort (); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "error during %p\n", + this->id (), "send_i()")); } + if (errno == EWOULDBLOCK) + return 0; + return -1; } + + // ... start over, how do we guarantee progress? Because if + // no bytes are sent send() can only return 0 or -1 + ACE_ASSERT (byte_count != 0); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " + "byte_count = %d, head_is_empty = %d\n", + this->id(), byte_count, (this->head_ == 0))); + } + return 1; } int -TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc) +TAO_Transport::drain_queue_i (void) { - // First purge our entry - this->transport_cache_manager ().purge_entry (this->cache_map_entry_); + // This is the vector used to send data, it must be declared outside + // the loop because after the loop there may still be data to be + // sent + int iovcnt = 0; + iovec iov[IOV_MAX]; - // Then add ourselves to the cache - return this->transport_cache_manager ().cache_transport (desc, - this); + // We loop over all the elements in the queue ... + TAO_Queued_Message *i = this->head_; + while (i != 0) + { + // ... each element fills the iovector ... + i->fill_iov (IOV_MAX, iovcnt, iov); + + // ... the vector is full, no choice but to send some data out. + // We need to loop because a single message can span multiple + // IOV_MAX elements ... + if (iovcnt == IOV_MAX) + { + int retval = + this->drain_queue_helper (iovcnt, iov); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::drain_queue_i, " + "helper retval = %d\n", + this->id (), retval)); + } + if (retval != 1) + return retval; + + i = this->head_; + continue; + } + // ... notice that this line is only reached if there is still + // room in the iovector ... + i = i->next (); + } + + + if (iovcnt != 0) + { + int retval = + this->drain_queue_helper (iovcnt, iov); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::drain_queue_i, " + "helper retval = %d\n", + this->id (), retval)); + } + if (retval != 1) + return retval; + } + + if (this->head_ == 0) + { + if (this->flush_timer_pending ()) + { + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + } + this->reset_flush_timer (); + } + return 1; + } + + return 0; } void -TAO_Transport::purge_entry (void) +TAO_Transport::cleanup_queue (size_t byte_count) { - if (this->cache_map_entry_ != 0) + while (this->head_ != 0 && byte_count > 0) { - (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_); + TAO_Queued_Message *i = this->head_; + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::cleanup_queue, " + "byte_count = %d\n", + this->id (), byte_count)); + } + + // Update the state of the first message + i->bytes_transferred (byte_count); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::cleanup_queue, " + "after transfer, bc = %d, all_sent = %d, ml = %d\n", + this->id (), byte_count, i->all_data_sent (), + i->message_length ())); + } + + // ... if all the data was sent the message must be removed from + // the queue... + if (i->all_data_sent ()) + { + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + } } } int -TAO_Transport::make_idle (void) +TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, + int &must_flush) { - if (this->cache_map_entry_ != 0) + // First let's compute the size of the queue: + size_t msg_count = 0; + size_t total_bytes = 0; + for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) { - return this->transport_cache_manager ().make_idle (this->cache_map_entry_); + msg_count++; + total_bytes += i->message_length (); } - return -1; -} + int set_timer; + ACE_Time_Value new_deadline; -void -TAO_Transport::close_connection (void) -{ - ACE_Event_Handler * eh = this->invalidate_event_handler (); - this->close_connection_shared (1, eh); + int constraints_reached = + stub->sync_strategy ().buffering_constraints_reached (stub, + msg_count, + total_bytes, + must_flush, + this->current_deadline_, + set_timer, + new_deadline); + + // ... set the new timer, also cancel any previous timers ... + if (set_timer) + { + ACE_Event_Handler *eh = this->event_handler_i (); + if (eh != 0) + { + ACE_Reactor *reactor = eh->reactor (); + if (reactor != 0) + { + this->current_deadline_ = new_deadline; + ACE_Time_Value delay = + new_deadline - ACE_OS::gettimeofday (); + + if (this->flush_timer_pending ()) + { + (void) reactor->cancel_timer (this->flush_timer_id_); + } + this->flush_timer_id_ = + reactor->schedule_timer (&this->transport_timer_, + &this->current_deadline_, + delay); + } + } + } + + return constraints_reached; } void -TAO_Transport::close_connection_i (void) +TAO_Transport::report_invalid_event_handler (const char *caller) { - ACE_Event_Handler * eh = this->invalidate_event_handler_i (); - this->close_connection_shared (1, eh); + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) - Transport[%d]::report_invalid_event_handler" + "(%s) no longer associated with handler [tag=%d]\n", + this->id (), caller, this->tag_)); + } } -void -TAO_Transport::close_connection_no_purge (void) +ACE_Event_Handler * +TAO_Transport::invalidate_event_handler (void) { - ACE_Event_Handler * eh = this->invalidate_event_handler (); - this->close_connection_shared (0, eh); + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0)); + + return this->invalidate_event_handler_i (); } void -TAO_Transport::close_connection_shared (int disable_purge, - ACE_Event_Handler * eh) +TAO_Transport::send_connection_closed_notifications (void) { - // Purge the entry - if (!disable_purge && this->cache_map_entry_ != 0) + while (this->head_ != 0) { - this->transport_cache_manager ().purge_entry (this->cache_map_entry_); - } + TAO_Queued_Message *i = this->head_; - if (eh == 0) - { - // The connection was already closed - return; + // @@ This is a good point to insert a flag to indicate that a + // CloseConnection message was successfully received. + i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); + + this->head_ = i->next (); + + i->destroy (); } - // We first try to remove the handler from the reactor. After that - // we destroy the handler using handle_close (). The remove handler - // is necessary because if the handle_closed is called directly, the - // reactor would be left with a dangling pointer. - if (this->ws_->is_registered ()) + this->tms ()->connection_closed (); + this->messaging_object ()->reset (); +} + +int +TAO_Transport::send_message_shared_i (TAO_Stub *stub, + int is_synchronous, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + if (is_synchronous) { - this->orb_core_->reactor ()->remove_handler ( - eh, - ACE_Event_Handler::ALL_EVENTS_MASK | - ACE_Event_Handler::DONT_CALL); + return this->send_synchronous_message_i (message_block, + max_wait_time); } - (void) eh->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::ALL_EVENTS_MASK); + // Let's figure out if the message should be queued without trying + // to send first: + int try_sending_first = 1; - this->send_connection_closed_notifications (); -} + int queue_empty = (this->head_ == 0); -ssize_t -TAO_Transport::send (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *timeout) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); + if (!queue_empty) + try_sending_first = 0; + else if (stub->sync_strategy ().must_queue (queue_empty)) + try_sending_first = 0; - if (this->check_event_handler_i ("Transport::send") == -1) - return -1; + ssize_t n; - // now call the template method - return this->send_i (iov, iovcnt, bytes_transferred, timeout); -} + TAO_Flushing_Strategy *flushing_strategy = + this->orb_core ()->flushing_strategy (); -ssize_t -TAO_Transport::recv (char *buffer, - size_t len, - const ACE_Time_Value *timeout) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); + if (try_sending_first) + { + size_t byte_count = 0; + // ... in this case we must try to send the message first ... - if (this->check_event_handler_i ("Transport::recv") == -1) - return -1; + size_t total_length = message_block->total_length (); + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "trying to send the message (ml = %d)\n", + this->id (), total_length)); + } - // now call the template method - return this->recv_i (buffer, len, timeout); -} + // @@ I don't think we want to hold the mutex here, however if + // we release it we need to recheck the status of the transport + // after we return... once I understand the final form for this + // code I will re-visit this decision + n = this->send_message_block_chain_i (message_block, + byte_count, + max_wait_time); + if (n == -1) + { + // ... if this is just an EWOULDBLOCK we must schedule the + // message for later, if it is ETIME we still have to send + // the complete message, because cutting off the message at + // this point will destroy the synchronization with the + // server ... + if (errno != EWOULDBLOCK && errno != ETIME) + { + return -1; + } + } + // ... let's figure out if the complete message was sent ... + if (total_length == byte_count) + { + // Done, just return. Notice that there are no allocations + // or copies up to this point (though some fancy calling + // back and forth). + // This is the common case for the critical path, it should + // be fast. + return 0; + } -int -TAO_Transport::generate_locate_request ( - TAO_Target_Specification &spec, - TAO_Operation_Details &opdetails, - TAO_OutputCDR &output) -{ - if (this->messaging_object ()->generate_locate_request_header (opdetails, - spec, - output) == -1) - { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::generate_locate_request, " - "error while marshalling the LocateRequest header\n", - this->id ())); + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "partial send %d / %d bytes\n", + this->id (), byte_count, total_length)); + } - return -1; + // ... part of the data was sent, need to figure out what piece + // of the message block chain must be queued ... + while (message_block != 0 && message_block->length () == 0) + message_block = message_block->cont (); + + // ... at least some portion of the message block chain should + // remain ... + ACE_ASSERT (message_block != 0); } - return 0; -} + // ... either the message must be queued or we need to queue it + // because it was not completely sent out ... + if (TAO_debug_level > 6) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport[%d]::send_message_i, " + "message is queued\n", + this->id ())); + } -int -TAO_Transport::generate_request_header ( - TAO_Operation_Details &opdetails, - TAO_Target_Specification &spec, - TAO_OutputCDR &output) -{ - if (this->messaging_object ()->generate_request_header (opdetails, - spec, - output) == -1) + TAO_Queued_Message *queued_message = 0; + ACE_NEW_RETURN (queued_message, + TAO_Asynch_Queued_Message (message_block), + -1); + queued_message->push_back (this->head_, this->tail_); + + // ... if the queue is full we need to activate the output on the + // queue ... + int must_flush = 0; + int constraints_reached = + this->check_buffering_constraints_i (stub, + must_flush); + + // ... but we also want to activate it if the message was partially + // sent.... Plus, when we use the blocking flushing strategy the + // queue is flushed as a side-effect of 'schedule_output()' + + if (constraints_reached || try_sending_first) { - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) - Transport[%d]::generate_request_header, " - "error while marshalling the Request header\n", - this->id())); + (void) flushing_strategy->schedule_output (this); + } - return -1; + if (must_flush) + { + typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + + (void) flushing_strategy->flush_transport (this); } return 0; } + + +/* + * + * All the methods relevant to the incoming data path of the ORB are + * defined below + * + */ int TAO_Transport::handle_input_i (TAO_Resume_Handle &rh, ACE_Time_Value * max_wait_time, @@ -1327,8 +1866,6 @@ TAO_Queued_Data * TAO_Transport::make_queued_data (ACE_Message_Block &incoming) { // Get an instance of TAO_Queued_Data - // @@ todo: Are we using the right allocator? May be not. We need to - // see how to have a general purpose allocator. TAO_Queued_Data *qd = TAO_Queued_Data::get_queued_data ( this->orb_core_->transport_message_buffer_allocator ()); @@ -1499,535 +2036,6 @@ TAO_Transport::notify_reactor (void) return 1; } -int -TAO_Transport::queue_is_empty (void) -{ - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - return this->queue_is_empty_i (); -} - -int -TAO_Transport::queue_is_empty_i (void) -{ - return (this->head_ == 0); -} - -int -TAO_Transport::register_handler (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, - guard, - *this->handler_lock_, - -1)); - if (this->check_event_handler_i ("Transport::register_handler") == -1) - return -1; - - return this->register_handler_i (); -} - - -int -TAO_Transport::schedule_output_i (void) -{ - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; - - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::schedule_output\n", - this->id ())); - } - - return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK); -} - -int -TAO_Transport::cancel_output_i (void) -{ - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh == 0) - return -1; - - ACE_Reactor *reactor = eh->reactor (); - if (reactor == 0) - return -1; - - if (TAO_debug_level > 3) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::cancel_output\n", - this->id ())); - } - - return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK); -} - -int -TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, - const void *act) -{ - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, " - "timer expired\n", - this->id ())); - } - - /// This is the only legal ACT in the current configuration.... - if (act != &this->current_deadline_) - return -1; - - if (this->flush_timer_pending ()) - { - // The timer is always a oneshot timer, so mark is as not - // pending. - this->reset_flush_timer (); - - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); - } - return 0; -} - -int -TAO_Transport::drain_queue (void) -{ - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - - int retval = this->drain_queue_i (); - - if (retval == 1) - { - // ... there is no current message or it was completely - // sent, cancel output... - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - - flushing_strategy->cancel_output (this); - - return 0; - } - - return retval; -} - -int -TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[]) -{ - if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1) - return -1; - - size_t byte_count = 0; - - // ... send the message ... - ssize_t retval = - this->send_i (iov, iovcnt, byte_count); - - if (TAO_debug_level == 5) - { - dump_iov (iov, iovcnt, this->id (), - byte_count, "drain_queue_helper"); - } - - // ... now we need to update the queue, removing elements - // that have been sent, and updating the last element if it - // was only partially sent ... - this->cleanup_queue (byte_count); - iovcnt = 0; - - if (retval == 0) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " - "send() returns 0", - this->id ())); - } - return -1; - } - else if (retval == -1) - { - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " - "error during %p\n", - this->id (), "send_i()")); - } - if (errno == EWOULDBLOCK) - return 0; - return -1; - } - - // ... start over, how do we guarantee progress? Because if - // no bytes are sent send() can only return 0 or -1 - ACE_ASSERT (byte_count != 0); - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::drain_queue_helper, " - "byte_count = %d, head_is_empty = %d\n", - this->id(), byte_count, (this->head_ == 0))); - } - return 1; -} - -int -TAO_Transport::drain_queue_i (void) -{ - // This is the vector used to send data, it must be declared outside - // the loop because after the loop there may still be data to be - // sent - int iovcnt = 0; - iovec iov[IOV_MAX]; - - // We loop over all the elements in the queue ... - TAO_Queued_Message *i = this->head_; - while (i != 0) - { - // ... each element fills the iovector ... - i->fill_iov (IOV_MAX, iovcnt, iov); - - // ... the vector is full, no choice but to send some data out. - // We need to loop because a single message can span multiple - // IOV_MAX elements ... - if (iovcnt == IOV_MAX) - { - int retval = - this->drain_queue_helper (iovcnt, iov); - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::drain_queue_i, " - "helper retval = %d\n", - this->id (), retval)); - } - if (retval != 1) - return retval; - - i = this->head_; - continue; - } - // ... notice that this line is only reached if there is still - // room in the iovector ... - i = i->next (); - } - - - if (iovcnt != 0) - { - int retval = - this->drain_queue_helper (iovcnt, iov); - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::drain_queue_i, " - "helper retval = %d\n", - this->id (), retval)); - } - if (retval != 1) - return retval; - } - - if (this->head_ == 0) - { - if (this->flush_timer_pending ()) - { - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - } - this->reset_flush_timer (); - } - return 1; - } - - return 0; -} - -void -TAO_Transport::cleanup_queue (size_t byte_count) -{ - while (this->head_ != 0 && byte_count > 0) - { - TAO_Queued_Message *i = this->head_; - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::cleanup_queue, " - "byte_count = %d\n", - this->id (), byte_count)); - } - - // Update the state of the first message - i->bytes_transferred (byte_count); - - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::cleanup_queue, " - "after transfer, bc = %d, all_sent = %d, ml = %d\n", - this->id (), byte_count, i->all_data_sent (), - i->message_length ())); - } - - // ... if all the data was sent the message must be removed from - // the queue... - if (i->all_data_sent ()) - { - i->remove_from_list (this->head_, this->tail_); - i->destroy (); - } - } -} - -int -TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, - int &must_flush) -{ - // First let's compute the size of the queue: - size_t msg_count = 0; - size_t total_bytes = 0; - for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ()) - { - msg_count++; - total_bytes += i->message_length (); - } - - int set_timer; - ACE_Time_Value new_deadline; - - int constraints_reached = - stub->sync_strategy ().buffering_constraints_reached (stub, - msg_count, - total_bytes, - must_flush, - this->current_deadline_, - set_timer, - new_deadline); - - // ... set the new timer, also cancel any previous timers ... - if (set_timer) - { - ACE_Event_Handler *eh = this->event_handler_i (); - if (eh != 0) - { - ACE_Reactor *reactor = eh->reactor (); - if (reactor != 0) - { - this->current_deadline_ = new_deadline; - ACE_Time_Value delay = - new_deadline - ACE_OS::gettimeofday (); - - if (this->flush_timer_pending ()) - { - (void) reactor->cancel_timer (this->flush_timer_id_); - } - this->flush_timer_id_ = - reactor->schedule_timer (&this->transport_timer_, - &this->current_deadline_, - delay); - } - } - } - - return constraints_reached; -} - -void -TAO_Transport::report_invalid_event_handler (const char *caller) -{ - if (TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) - Transport[%d]::report_invalid_event_handler" - "(%s) no longer associated with handler [tag=%d]\n", - this->id (), caller, this->tag_)); - } -} - -ACE_Event_Handler * -TAO_Transport::invalidate_event_handler (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0)); - - return this->invalidate_event_handler_i (); -} - -void -TAO_Transport::send_connection_closed_notifications (void) -{ - while (this->head_ != 0) - { - TAO_Queued_Message *i = this->head_; - - // @@ This is a good point to insert a flag to indicate that a - // CloseConnection message was successfully received. - i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); - - this->head_ = i->next (); - - i->destroy (); - } - - this->tms ()->connection_closed (); - this->messaging_object ()->reset (); -} - -int -TAO_Transport::send_message_shared_i (TAO_Stub *stub, - int is_synchronous, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time) -{ - if (is_synchronous) - { - return this->send_synchronous_message_i (message_block, - max_wait_time); - } - - // Let's figure out if the message should be queued without trying - // to send first: - int try_sending_first = 1; - - int queue_empty = (this->head_ == 0); - - if (!queue_empty) - try_sending_first = 0; - else if (stub->sync_strategy ().must_queue (queue_empty)) - try_sending_first = 0; - - ssize_t n; - - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - - if (try_sending_first) - { - size_t byte_count = 0; - // ... in this case we must try to send the message first ... - - size_t total_length = message_block->total_length (); - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - "trying to send the message (ml = %d)\n", - this->id (), total_length)); - } - - // @@ I don't think we want to hold the mutex here, however if - // we release it we need to recheck the status of the transport - // after we return... once I understand the final form for this - // code I will re-visit this decision - n = this->send_message_block_chain_i (message_block, - byte_count, - max_wait_time); - if (n == -1) - { - // ... if this is just an EWOULDBLOCK we must schedule the - // message for later, if it is ETIME we still have to send - // the complete message, because cutting off the message at - // this point will destroy the synchronization with the - // server ... - if (errno != EWOULDBLOCK && errno != ETIME) - { - return -1; - } - } - - // ... let's figure out if the complete message was sent ... - if (total_length == byte_count) - { - // Done, just return. Notice that there are no allocations - // or copies up to this point (though some fancy calling - // back and forth). - // This is the common case for the critical path, it should - // be fast. - return 0; - } - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - "partial send %d / %d bytes\n", - this->id (), byte_count, total_length)); - } - - // ... part of the data was sent, need to figure out what piece - // of the message block chain must be queued ... - while (message_block != 0 && message_block->length () == 0) - message_block = message_block->cont (); - - // ... at least some portion of the message block chain should - // remain ... - ACE_ASSERT (message_block != 0); - } - - // ... either the message must be queued or we need to queue it - // because it was not completely sent out ... - - if (TAO_debug_level > 6) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport[%d]::send_message_i, " - "message is queued\n", - this->id ())); - } - - TAO_Queued_Message *queued_message = 0; - ACE_NEW_RETURN (queued_message, - TAO_Asynch_Queued_Message (message_block), - -1); - queued_message->push_back (this->head_, this->tail_); - - // ... if the queue is full we need to activate the output on the - // queue ... - int must_flush = 0; - int constraints_reached = - this->check_buffering_constraints_i (stub, - must_flush); - - // ... but we also want to activate it if the message was partially - // sent.... Plus, when we use the blocking flushing strategy the - // queue is flushed as a side-effect of 'schedule_output()' - - if (constraints_reached || try_sending_first) - { - (void) flushing_strategy->schedule_output (this); - } - - if (must_flush) - { - typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK; - TAO_REVERSE_LOCK reverse (*this->handler_lock_); - ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); - - (void) flushing_strategy->flush_transport (this); - } - - return 0; -} - TAO_Transport_Cache_Manager & TAO_Transport::transport_cache_manager (void) diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index d973f2bc835..4cfc9320f98 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -13,14 +13,12 @@ */ //============================================================================= - #ifndef TAO_TRANSPORT_H #define TAO_TRANSPORT_H #include "ace/pre.h" #include "corbafwd.h" - #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -29,8 +27,8 @@ #include "Transport_Descriptor_Interface.h" #include "Transport_Cache_Manager.h" #include "Transport_Timer.h" -#include "ace/Refcountable.h" #include "Incoming_Message_Queue.h" +#include "Synch_Refcountable.h" class TAO_ORB_Core; class TAO_Target_Specification; @@ -43,21 +41,7 @@ class TAO_Pluggable_Messaging; class TAO_Queued_Message; class TAO_Resume_Handle; -class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable -{ -public: - virtual ~TAO_Synch_Refcountable (void); - - int increment (void); - int decrement (void); - - int refcount (void) const; -protected: - TAO_Synch_Refcountable (ACE_Lock *lock, int refcount); - - ACE_Lock *refcount_lock_; -}; /** * @class TAO_Transport @@ -235,6 +219,10 @@ public: /// destructor virtual ~TAO_Transport (void); + // Maintain reference counting with these + static TAO_Transport* _duplicate (TAO_Transport* transport); + static void release (TAO_Transport* transport); + /// Return the protocol tag. /** * The OMG assigns unique tags (a 32-bit unsigned number) to each @@ -280,34 +268,30 @@ public: void cache_map_entry (TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry); TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void); - /// Return the identifier for this transport instance. + /// Set and Get the identifier for this transport instance. /** * If not set, this will return an integer representation of * the <code>this</code> pointer for the instance on which * it's called. */ int id (void) const; - /// Set the identifier for this transport instance. void id (int id); - /// Return the order for the purging strategy. + /// Get and Set the purging order. The purging strategy uses the set + /// version to set the purging order. unsigned long purging_order (void) const; - - /// Allow the purging strategy to set the order. void purging_order(unsigned long value); + /// Check if there are messages pending in the queue /** - * Initialising the messaging object. This would be used by the - * connector side. On the acceptor side the connection handler - * would take care of the messaging objects. + * @return 1 if the queue is empty */ - virtual int messaging_init (CORBA::Octet major, - CORBA::Octet minor) = 0; + int queue_is_empty (void); /// Fill in a handle_set with any associated handler's reactor handle. /** * Called by the cache when the cache is closing in order to fill - * in a handle_set in a lock-safe manner. + * in a handle_set in a thread-safe manner. * * @param reactor_registered the ACE_Handle_Set into which the * transport should place any handle registered with the reactor @@ -319,58 +303,22 @@ public: void provide_handle (ACE_Handle_Set &reactor_registered, TAO_EventHandlerSet &unregistered); - /// Extracts the list of listen points from the <cdr> stream. The - /// list would have the protocol specific details of the - /// ListenPoints - virtual int tear_listen_point_list (TAO_InputCDR &cdr); /// Remove all messages from the outgoing queue. /** * @todo: shouldn't this be automated? */ - void dequeue_all (void); - - /// Check if there are messages pending in the queue - /** - * @return 1 if the queue is empty - */ - int queue_is_empty (void); + // void dequeue_all (void); - /// Register the handler with the reactor. /** - * This method is used by the Wait_On_Reactor strategy. The - * transport must register its event handler with the ORB's Reactor. - * - * @todo: I think this method is pretty much useless, the - * connections are *always* registered with the Reactor, except in - * thread-per-connection mode. In that case putting the connection - * in the Reactor would produce unpredictable results anyway. + * Register the handler with the reactor. This method is used by the + * Wait_On_Reactor strategy. The transport must register its event + * handler with the ORB's Reactor. */ - // @@ lockme int register_handler (void); - /** - * @name Control connection lifecycle - * - * These methods are routed through the TMS object. The TMS - * strategies implement them correctly. - */ - //@{ - - /// Request has been just sent, but the reply is not received. Idle - /// the transport now. - virtual int idle_after_send (void); - /// Request is sent and the reply is received. Idle the transport - /// now. - virtual int idle_after_reply (void); - - /// Call the implementation method after obtaining the lock. - virtual void close_connection (void); - - //@} - - /// Write the complete Message_Block chain to the connection. + /// Write the complete Message_Block chain to the connection. /** * This method serializes on handler_lock_, guaranteeing that only * thread can execute it on the same instance concurrently. @@ -428,6 +376,48 @@ public: + /** + * @name Control connection lifecycle + * + * These methods are routed through the TMS object. The TMS + * strategies implement them correctly. + */ + //@{ + + /// Request has been just sent, but the reply is not received. Idle + /// the transport now. + virtual int idle_after_send (void); + + /// Request is sent and the reply is received. Idle the transport + /// now. + virtual int idle_after_reply (void); + + /// Call the implementation method after obtaining the lock. + virtual void close_connection (void); + + //@} + + /** @name Template methods + * + * The Transport class uses the Template Method Pattern to implement + * the protocol specific functionality. + * Implementors of a pluggable protocol should override the + * following methods with the semantics documented below. + */ + /** + * Initialising the messaging object. This would be used by the + * connector side. On the acceptor side the connection handler + * would take care of the messaging objects. + */ + virtual int messaging_init (CORBA::Octet major, + CORBA::Octet minor) = 0; + + + + /// Extracts the list of listen points from the <cdr> stream. The + /// list would have the protocol specific details of the + /// ListenPoints + virtual int tear_listen_point_list (TAO_InputCDR &cdr); protected: /** @name Template methods @@ -537,6 +527,13 @@ public: TAO_Target_Specification &spec, TAO_OutputCDR &msg); + /// recache ourselves in the cache + int recache_transport (TAO_Transport_Descriptor_Interface* desc); + + /// Method for the connection handler to signify that it + /// is being closed and destroyed. + virtual void connection_handler_closing (void); + /// Callback to read incoming data /** * The ACE_Event_Handler adapter invokes this method as part of its @@ -681,16 +678,7 @@ protected: TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming); public: - /// Method for the connection handler to signify that it - /// is being closed and destroyed. - virtual void connection_handler_closing (void); - // Maintain reference counting with these - static TAO_Transport* _duplicate (TAO_Transport* transport); - static void release (TAO_Transport* transport); - - /// recache ourselves in the cache - int recache_transport (TAO_Transport_Descriptor_Interface* desc); diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl index fc277e14eb4..236d4d93b39 100644 --- a/TAO/tao/Transport.inl +++ b/TAO/tao/Transport.inl @@ -80,6 +80,17 @@ TAO_Transport::id (int id) this->id_ = id; } +ACE_INLINE int +TAO_Transport::queue_is_empty (void) +{ + ACE_GUARD_RETURN (ACE_Lock, + ace_mon, + *this->handler_lock_, + -1); + return this->queue_is_empty_i (); +} + + ACE_INLINE int TAO_Transport::flush_timer_pending (void) const |