summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TAO/ChangeLog23
-rw-r--r--TAO/tao/Incoming_Message_Queue.h2
-rw-r--r--TAO/tao/Synch_Refcountable.cpp22
-rw-r--r--TAO/tao/Synch_Refcountable.h57
-rw-r--r--TAO/tao/Synch_Refcountable.inl21
-rw-r--r--TAO/tao/TAO.dsp12
-rw-r--r--TAO/tao/TAO_Static.dsp12
-rw-r--r--TAO/tao/Transport.cpp1538
-rw-r--r--TAO/tao/Transport.h144
-rw-r--r--TAO/tao/Transport.inl11
10 files changed, 998 insertions, 844 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 4e41d89997d..bff36d486fe 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,26 @@
+Sun Jul 07 21:56:23 2002 Balachandran Natarajan <bala@cs.wustl.edu>
+
+ * tao/Transport.h:
+ * tao/Transport.cpp:
+ * tao/Transport.inl: Did a few cosmetic changes.
+
+ - Removed the TAO_Synch_Refcountable class and moved that to a
+ new file.
+ - Rearranged the methods in cpp and inl files, so that the
+ methods specific for the input path and output path are grouped
+ together. Just did this to improve readability.
+ - Added a new method by name transport_cache_manager () which
+ returns the reference to transport cache manager from the ORB.
+
+ * tao/Synch_Refcountable.{h,cpp,inl}: Added this new file which
+ contains the class TAO_Synch_Refcountable.
+
+ * tao/TAO.dsp:
+ * tao/TAO_Static.dsp: Added the new files to the project
+ workspace.
+
+ * tao/Incoming_Mesage_Queue.h: Fixed an erroneous comment.
+
Sun Jul 7 18:53:54 2002 Jeff Parsons <parsons@cs.wustl.edu>
* TAO_IDL/fe/fe_tmplinst.cpp:
diff --git a/TAO/tao/Incoming_Message_Queue.h b/TAO/tao/Incoming_Message_Queue.h
index 23a11f021f9..52d3ea50d34 100644
--- a/TAO/tao/Incoming_Message_Queue.h
+++ b/TAO/tao/Incoming_Message_Queue.h
@@ -97,7 +97,7 @@ private:
private:
- /// A circular linked listof messages that await processing
+ /// A linked listof messages that await processing
TAO_Queued_Data *queued_data_;
/// The size of the queue
diff --git a/TAO/tao/Synch_Refcountable.cpp b/TAO/tao/Synch_Refcountable.cpp
new file mode 100644
index 00000000000..4bd6b1fc0ab
--- /dev/null
+++ b/TAO/tao/Synch_Refcountable.cpp
@@ -0,0 +1,22 @@
+#include "Synch_Refcountable.h"
+
+
+#if !defined (__ACE_INLINE__)
+# include "Synch_Refcountable.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID (tao,
+ Synch_Refcountable,
+ "$Id$")
+
+TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock,
+ int refcount)
+ : ACE_Refcountable (refcount)
+ , refcount_lock_ (lock)
+{
+}
+
+TAO_Synch_Refcountable::~TAO_Synch_Refcountable (void)
+{
+ delete this->refcount_lock_;
+}
diff --git a/TAO/tao/Synch_Refcountable.h b/TAO/tao/Synch_Refcountable.h
new file mode 100644
index 00000000000..534873d255f
--- /dev/null
+++ b/TAO/tao/Synch_Refcountable.h
@@ -0,0 +1,57 @@
+// This may look like C, but it's really
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Synch_Refcountable.h
+ *
+ * $Id$
+ *
+ * Definition for a synchronised refcountable interface.
+ *
+ * @author Fred Kuhns <fredk@cs.wustl.edu>
+ */
+//=============================================================================
+#ifndef TAO_SYNCH_REFCOUNTABLE_H
+#define TAO_SYNCH_REFCOUNTABLE_H
+#include "ace/pre.h"
+
+#include "ace/Refcountable.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "TAO_Export.h"
+#include "ace/Synch.h"
+
+class ACE_Lock;
+
+/**
+ * @class TAO_Synch_Refcountable
+ *
+ * @brief Definition for a synchronised refcountable interface.
+ */
+class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable
+{
+public:
+ virtual ~TAO_Synch_Refcountable (void);
+
+ int increment (void);
+ int decrement (void);
+
+ int refcount (void) const;
+
+protected:
+ TAO_Synch_Refcountable (ACE_Lock *lock, int refcount);
+
+ ACE_Lock *refcount_lock_;
+};
+
+
+#if defined (__ACE_INLINE__)
+# include "Synch_Refcountable.inl"
+#endif /* __ACE_INLINE__ */
+
+#include "ace/post.h"
+#endif /*TAO_SYNCH_REFCOUNTABLE*/
diff --git a/TAO/tao/Synch_Refcountable.inl b/TAO/tao/Synch_Refcountable.inl
new file mode 100644
index 00000000000..f933f1fd5ff
--- /dev/null
+++ b/TAO/tao/Synch_Refcountable.inl
@@ -0,0 +1,21 @@
+// -*- C++ -*-
+//$Id$
+ACE_INLINE int
+TAO_Synch_Refcountable::increment (void)
+{
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0);
+ return ACE_Refcountable::increment ();
+}
+
+ACE_INLINE int
+TAO_Synch_Refcountable::decrement (void)
+{
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0);
+ return ACE_Refcountable::decrement ();
+}
+
+ACE_INLINE int
+TAO_Synch_Refcountable::refcount (void) const
+{
+ return ACE_Refcountable::refcount ();
+}
diff --git a/TAO/tao/TAO.dsp b/TAO/tao/TAO.dsp
index 3f364f0a467..5c879afd067 100644
--- a/TAO/tao/TAO.dsp
+++ b/TAO/tao/TAO.dsp
@@ -831,6 +831,10 @@ SOURCE=.\Synch_Queued_Message.cpp
# End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -1659,6 +1663,10 @@ SOURCE=.\Synch_Queued_Message.h
# End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -2263,6 +2271,10 @@ SOURCE=.\Sync_Strategies.i
# End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Tagged_Components.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/TAO_Static.dsp b/TAO/tao/TAO_Static.dsp
index 21d51b8e0d2..538caf636e8 100644
--- a/TAO/tao/TAO_Static.dsp
+++ b/TAO/tao/TAO_Static.dsp
@@ -763,6 +763,10 @@ SOURCE=.\Synch_Queued_Message.cpp
# End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.cpp
# End Source File
# Begin Source File
@@ -1643,6 +1647,10 @@ SOURCE=.\Synch_Queued_Message.h
# End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.h
+# End Source File
+# Begin Source File
+
SOURCE=.\Synch_Reply_Dispatcher.h
# End Source File
# Begin Source File
@@ -2203,6 +2211,10 @@ SOURCE=.\Sync_Strategies.i
# End Source File
# Begin Source File
+SOURCE=.\Synch_Refcountable.inl
+# End Source File
+# Begin Source File
+
SOURCE=.\Tagged_Components.i
# End Source File
# Begin Source File
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index e3b0a611cf1..68ab8203f9a 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -31,38 +31,68 @@ ACE_RCSID (tao,
Transport,
"$Id$")
-
-TAO_Synch_Refcountable::TAO_Synch_Refcountable (ACE_Lock *lock, int refcount)
- : ACE_Refcountable (refcount)
- , refcount_lock_ (lock)
+/*
+ * Static function in file scope
+ */
+static void
+dump_iov (iovec *iov, int iovcnt, int id,
+ size_t current_transfer,
+ const char *location)
{
-}
+ ACE_Log_Msg::instance ()->acquire ();
-TAO_Synch_Refcountable::~TAO_Synch_Refcountable (void)
-{
- delete this->refcount_lock_;
-}
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::%s, "
+ "sending %d buffers\n",
+ id, location, iovcnt));
+ for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
+ {
+ size_t iov_len = iov[i].iov_len;
-int
-TAO_Synch_Refcountable::increment (void)
-{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0);
- return ACE_Refcountable::increment ();
-}
+ // Possibly a partially sent iovec entry.
+ if (current_transfer < iov_len)
+ iov_len = current_transfer;
-int
-TAO_Synch_Refcountable::decrement (void)
-{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->refcount_lock_, 0);
- return ACE_Refcountable::decrement ();
-}
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::%s, "
+ "buffer %d/%d has %d bytes\n",
+ id, location,
+ i, iovcnt,
+ iov_len));
-int
-TAO_Synch_Refcountable::refcount (void) const
-{
- return ACE_Refcountable::refcount ();
+ size_t len;
+ for (size_t offset = 0; offset < iov_len; offset += len)
+ {
+ ACE_TCHAR header[1024];
+ ACE_OS::sprintf (header,
+ "TAO - Transport[%d]::%s ("
+ ACE_SIZE_T_FORMAT_SPECIFIER "/"
+ ACE_SIZE_T_FORMAT_SPECIFIER ")\n",
+ id, location, offset, iov_len);
+
+ len = iov_len - offset;
+ if (len > 512)
+ len = 512;
+ ACE_HEX_DUMP ((LM_DEBUG,
+ ACE_static_cast(char*,iov[i].iov_base) + offset,
+ len,
+ header));
+ }
+ current_transfer -= iov_len;
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::%s, "
+ "end of data\n",
+ id, location));
+
+ ACE_Log_Msg::instance ()->release ();
}
+/*
+ * Definitions for methods declared in the transport class
+ *
+ */
+
// Constructor.
TAO_Transport::TAO_Transport (CORBA::ULong tag,
TAO_ORB_Core *orb_core)
@@ -116,39 +146,49 @@ TAO_Transport::~TAO_Transport (void)
// outside the TAO_Transport.
if (this->cache_map_entry_ != 0)
{
- this->orb_core_->lane_resources ().transport_cache ().purge_entry (
- this->cache_map_entry_);
+ this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
}
}
-int
-TAO_Transport::handle_output (void)
+
+/*
+ *
+ * Public utility methods that are called by other classes.
+ *
+ */
+/*static*/ TAO_Transport*
+TAO_Transport::_duplicate (TAO_Transport* transport)
{
- if (TAO_debug_level > 3)
+ if (transport != 0)
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_output\n",
- this->id ()));
+ transport->increment ();
}
+ return transport;
+}
- // The flushing strategy (potentially via the Reactor) wants to send
- // more data, first check if there is a current message that needs
- // more sending...
- int retval = this->drain_queue ();
-
- if (TAO_debug_level > 3)
+/*static*/ void
+TAO_Transport::release (TAO_Transport* transport)
+{
+ if (transport != 0)
{
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::handle_output, "
- "drain_queue returns %d/%d\n",
- this->id (),
- retval, errno));
- }
+ int count = transport->decrement ();
- // Any errors are returned directly to the Reactor
- return retval;
+ if (count == 0)
+ {
+ delete transport;
+ }
+ else if (count < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport[%d]::release, "
+ "reference count is less than zero: %d\n",
+ transport->id (), count));
+ ACE_OS::abort ();
+ }
+ }
}
+
void
TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered,
TAO_EventHandlerSet &unregistered)
@@ -157,7 +197,6 @@ TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered,
guard,
*this->handler_lock_));
ACE_Event_Handler *eh = this->event_handler_i ();
- // TAO_Connection_Handler *ch = ACE_reinterpret_cast (TAO_Connection_Handler *, eh);
if (eh != 0)
{
@@ -172,60 +211,216 @@ TAO_Transport::provide_handle (ACE_Handle_Set &reactor_registered,
}
}
-static void
-dump_iov (iovec *iov, int iovcnt, int id,
- size_t current_transfer,
- const char *location)
+
+int
+TAO_Transport::register_handler (void)
{
- ACE_Log_Msg::instance ()->acquire ();
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->handler_lock_,
+ -1));
+ if (this->check_event_handler_i ("Transport::register_handler") == -1)
+ return -1;
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::%s, "
- "sending %d buffers\n",
- id, location, iovcnt));
- for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
+ return this->register_handler_i ();
+}
+
+
+ssize_t
+TAO_Transport::send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *timeout)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->handler_lock_,
+ -1));
+
+ if (this->check_event_handler_i ("Transport::send") == -1)
+ return -1;
+
+ // now call the template method
+ return this->send_i (iov, iovcnt, bytes_transferred, timeout);
+}
+
+
+ssize_t
+TAO_Transport::recv (char *buffer,
+ size_t len,
+ const ACE_Time_Value *timeout)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
+ guard,
+ *this->handler_lock_,
+ -1));
+
+ if (this->check_event_handler_i ("Transport::recv") == -1)
+ return -1;
+
+ // now call the template method
+ return this->recv_i (buffer, len, timeout);
+}
+
+
+int
+TAO_Transport::idle_after_send (void)
+{
+ return this->tms ()->idle_after_send ();
+}
+
+int
+TAO_Transport::idle_after_reply (void)
+{
+ return this->tms ()->idle_after_reply ();
+}
+
+int
+TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
+{
+ ACE_NOTSUP_RETURN (-1);
+}
+
+void
+TAO_Transport::close_connection (void)
+{
+ ACE_Event_Handler * eh = this->invalidate_event_handler ();
+ this->close_connection_shared (1, eh);
+}
+
+int
+TAO_Transport::generate_locate_request (
+ TAO_Target_Specification &spec,
+ TAO_Operation_Details &opdetails,
+ TAO_OutputCDR &output)
+{
+ if (this->messaging_object ()->generate_locate_request_header (opdetails,
+ spec,
+ output) == -1)
{
- size_t iov_len = iov[i].iov_len;
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::generate_locate_request, "
+ "error while marshalling the LocateRequest header\n",
+ this->id ()));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int
+TAO_Transport::generate_request_header (
+ TAO_Operation_Details &opdetails,
+ TAO_Target_Specification &spec,
+ TAO_OutputCDR &output)
+{
+ if (this->messaging_object ()->generate_request_header (opdetails,
+ spec,
+ output) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) - Transport[%d]::generate_request_header, "
+ "error while marshalling the Request header\n",
+ this->id()));
+
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ * NOTE: Some of these calls looks like ideal fodder for
+ * inlining. But, please note that the calls made within the method
+ * are not inlined. This would increase closure cost on the
+ * compiler.
+ */
+void
+TAO_Transport::connection_handler_closing (void)
+{
+ // The connection has closed, we must invalidate the handler to
+ // ensure that any attempt to use this transport results in an
+ // error. Basically all the other methods in the Transport
+ // cooperate via check_event_handler_i()
+
+ (void) this->invalidate_event_handler ();
+ this->send_connection_closed_notifications ();
+
+ // Can't hold the lock while we release, b/c the release could
+ // invoke the destructor! This should be the last thing we do here
+ TAO_Transport::release (this);
+}
+
+int
+TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
+{
+ // First purge our entry
+ this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
+
+ // Then add ourselves to the cache
+ return this->transport_cache_manager ().cache_transport (desc,
+ this);
+}
+
+void
+TAO_Transport::purge_entry (void)
+{
+ if (this->cache_map_entry_ != 0)
+ {
+ (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
+ }
+}
+
+int
+TAO_Transport::make_idle (void)
+{
+ if (this->cache_map_entry_ != 0)
+ {
+ return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
+ }
+
+ return -1;
+}
- // Possibly a partially sent iovec entry.
- if (current_transfer < iov_len)
- iov_len = current_transfer;
+/*
+ *
+ * Methods called and used in the output path of the ORB.
+ *
+ */
+int
+TAO_Transport::handle_output (void)
+{
+ if (TAO_debug_level > 3)
+ {
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::%s, "
- "buffer %d/%d has %d bytes\n",
- id, location,
- i, iovcnt,
- iov_len));
+ "TAO (%P|%t) - Transport[%d]::handle_output\n",
+ this->id ()));
+ }
- size_t len;
- for (size_t offset = 0; offset < iov_len; offset += len)
- {
- ACE_TCHAR header[1024];
- ACE_OS::sprintf (header,
- "TAO - Transport[%d]::%s ("
- ACE_SIZE_T_FORMAT_SPECIFIER "/"
- ACE_SIZE_T_FORMAT_SPECIFIER ")\n",
- id, location, offset, iov_len);
+ // The flushing strategy (potentially via the Reactor) wants to send
+ // more data, first check if there is a current message that needs
+ // more sending...
+ int retval = this->drain_queue ();
- len = iov_len - offset;
- if (len > 512)
- len = 512;
- ACE_HEX_DUMP ((LM_DEBUG,
- ACE_static_cast(char*,iov[i].iov_base) + offset,
- len,
- header));
- }
- current_transfer -= iov_len;
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::handle_output, "
+ "drain_queue returns %d/%d\n",
+ this->id (),
+ retval, errno));
}
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::%s, "
- "end of data\n",
- id, location));
- ACE_Log_Msg::instance ()->release ();
+ // Any errors are returned directly to the Reactor
+ return retval;
}
+
int
TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
size_t &bytes_transferred,
@@ -412,238 +607,582 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
return 1;
}
+
+
+
+void
+TAO_Transport::close_connection_i (void)
+{
+ ACE_Event_Handler * eh = this->invalidate_event_handler_i ();
+ this->close_connection_shared (1, eh);
+}
+
+void
+TAO_Transport::close_connection_no_purge (void)
+{
+ ACE_Event_Handler * eh = this->invalidate_event_handler ();
+ this->close_connection_shared (0, eh);
+}
+
+
+void
+TAO_Transport::close_connection_shared (int disable_purge,
+ ACE_Event_Handler * eh)
+{
+ // Purge the entry
+ if (!disable_purge && this->cache_map_entry_ != 0)
+ {
+ this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
+ }
+
+ if (eh == 0)
+ {
+ // The connection was already closed
+ return;
+ }
+
+ // We first try to remove the handler from the reactor. After that
+ // we destroy the handler using handle_close (). The remove handler
+ // is necessary because if the handle_closed is called directly, the
+ // reactor would be left with a dangling pointer.
+ if (this->ws_->is_registered ())
+ {
+ this->orb_core_->reactor ()->remove_handler (
+ eh,
+ ACE_Event_Handler::ALL_EVENTS_MASK |
+ ACE_Event_Handler::DONT_CALL);
+ }
+
+ (void) eh->handle_close (ACE_INVALID_HANDLE,
+ ACE_Event_Handler::ALL_EVENTS_MASK);
+
+ this->send_connection_closed_notifications ();
+}
+
+
+
+
+
+
int
-TAO_Transport::idle_after_send (void)
+TAO_Transport::queue_is_empty_i (void)
{
- return this->tms ()->idle_after_send ();
+ return (this->head_ == 0);
}
+
+
+
int
-TAO_Transport::idle_after_reply (void)
+TAO_Transport::schedule_output_i (void)
{
- return this->tms ()->idle_after_reply ();
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
+
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor == 0)
+ return -1;
+
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::schedule_output\n",
+ this->id ()));
+ }
+
+ return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
}
int
-TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
+TAO_Transport::cancel_output_i (void)
{
- ACE_NOTSUP_RETURN (-1);
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh == 0)
+ return -1;
+
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor == 0)
+ return -1;
+
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::cancel_output\n",
+ this->id ()));
+ }
+
+ return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
}
-void
-TAO_Transport::connection_handler_closing (void)
+int
+TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
+ const void *act)
{
- // The connection has closed, we must invalidate the handler to
- // ensure that any attempt to use this transport results in an
- // error. Basically all the other methods in the Transport
- // cooperate via check_event_handler_i()
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, "
+ "timer expired\n",
+ this->id ()));
+ }
- (void) this->invalidate_event_handler ();
- this->send_connection_closed_notifications ();
+ /// This is the only legal ACT in the current configuration....
+ if (act != &this->current_deadline_)
+ return -1;
- // Can't hold the lock while we release, b/c the release could
- // invoke the destructor! This should be the last thing we do here
- TAO_Transport::release (this);
+ if (this->flush_timer_pending ())
+ {
+ // The timer is always a oneshot timer, so mark is as not
+ // pending.
+ this->reset_flush_timer ();
+
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+ (void) flushing_strategy->schedule_output (this);
+ }
+ return 0;
}
-TAO_Transport*
-TAO_Transport::_duplicate (TAO_Transport* transport)
+int
+TAO_Transport::drain_queue (void)
{
- if (transport != 0)
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
+
+ int retval = this->drain_queue_i ();
+
+ if (retval == 1)
{
- transport->increment ();
+ // ... there is no current message or it was completely
+ // sent, cancel output...
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
+
+ flushing_strategy->cancel_output (this);
+
+ return 0;
}
- return transport;
+
+ return retval;
}
-void
-TAO_Transport::release (TAO_Transport* transport)
+int
+TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
{
- if (transport != 0)
+ if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1)
+ return -1;
+
+ size_t byte_count = 0;
+
+ // ... send the message ...
+ ssize_t retval =
+ this->send_i (iov, iovcnt, byte_count);
+
+ if (TAO_debug_level == 5)
{
- int count = transport->decrement ();
+ dump_iov (iov, iovcnt, this->id (),
+ byte_count, "drain_queue_helper");
+ }
- if (count == 0)
+ // ... now we need to update the queue, removing elements
+ // that have been sent, and updating the last element if it
+ // was only partially sent ...
+ this->cleanup_queue (byte_count);
+ iovcnt = 0;
+
+ if (retval == 0)
+ {
+ if (TAO_debug_level > 4)
{
- delete transport;
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "send() returns 0",
+ this->id ()));
}
- else if (count < 0)
+ return -1;
+ }
+ else if (retval == -1)
+ {
+ if (TAO_debug_level > 4)
{
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport[%d]::release, "
- "reference count is less than zero: %d\n",
- transport->id (), count));
- ACE_OS::abort ();
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "error during %p\n",
+ this->id (), "send_i()"));
}
+ if (errno == EWOULDBLOCK)
+ return 0;
+ return -1;
}
+
+ // ... start over, how do we guarantee progress? Because if
+ // no bytes are sent send() can only return 0 or -1
+ ACE_ASSERT (byte_count != 0);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
+ "byte_count = %d, head_is_empty = %d\n",
+ this->id(), byte_count, (this->head_ == 0)));
+ }
+ return 1;
}
int
-TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
+TAO_Transport::drain_queue_i (void)
{
- // First purge our entry
- this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
+ // This is the vector used to send data, it must be declared outside
+ // the loop because after the loop there may still be data to be
+ // sent
+ int iovcnt = 0;
+ iovec iov[IOV_MAX];
- // Then add ourselves to the cache
- return this->transport_cache_manager ().cache_transport (desc,
- this);
+ // We loop over all the elements in the queue ...
+ TAO_Queued_Message *i = this->head_;
+ while (i != 0)
+ {
+ // ... each element fills the iovector ...
+ i->fill_iov (IOV_MAX, iovcnt, iov);
+
+ // ... the vector is full, no choice but to send some data out.
+ // We need to loop because a single message can span multiple
+ // IOV_MAX elements ...
+ if (iovcnt == IOV_MAX)
+ {
+ int retval =
+ this->drain_queue_helper (iovcnt, iov);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
+ "helper retval = %d\n",
+ this->id (), retval));
+ }
+ if (retval != 1)
+ return retval;
+
+ i = this->head_;
+ continue;
+ }
+ // ... notice that this line is only reached if there is still
+ // room in the iovector ...
+ i = i->next ();
+ }
+
+
+ if (iovcnt != 0)
+ {
+ int retval =
+ this->drain_queue_helper (iovcnt, iov);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
+ "helper retval = %d\n",
+ this->id (), retval));
+ }
+ if (retval != 1)
+ return retval;
+ }
+
+ if (this->head_ == 0)
+ {
+ if (this->flush_timer_pending ())
+ {
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh != 0)
+ {
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor != 0)
+ {
+ (void) reactor->cancel_timer (this->flush_timer_id_);
+ }
+ }
+ this->reset_flush_timer ();
+ }
+ return 1;
+ }
+
+ return 0;
}
void
-TAO_Transport::purge_entry (void)
+TAO_Transport::cleanup_queue (size_t byte_count)
{
- if (this->cache_map_entry_ != 0)
+ while (this->head_ != 0 && byte_count > 0)
{
- (void) this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
+ TAO_Queued_Message *i = this->head_;
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
+ "byte_count = %d\n",
+ this->id (), byte_count));
+ }
+
+ // Update the state of the first message
+ i->bytes_transferred (byte_count);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
+ "after transfer, bc = %d, all_sent = %d, ml = %d\n",
+ this->id (), byte_count, i->all_data_sent (),
+ i->message_length ()));
+ }
+
+ // ... if all the data was sent the message must be removed from
+ // the queue...
+ if (i->all_data_sent ())
+ {
+ i->remove_from_list (this->head_, this->tail_);
+ i->destroy ();
+ }
}
}
int
-TAO_Transport::make_idle (void)
+TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
+ int &must_flush)
{
- if (this->cache_map_entry_ != 0)
+ // First let's compute the size of the queue:
+ size_t msg_count = 0;
+ size_t total_bytes = 0;
+ for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
{
- return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
+ msg_count++;
+ total_bytes += i->message_length ();
}
- return -1;
-}
+ int set_timer;
+ ACE_Time_Value new_deadline;
-void
-TAO_Transport::close_connection (void)
-{
- ACE_Event_Handler * eh = this->invalidate_event_handler ();
- this->close_connection_shared (1, eh);
+ int constraints_reached =
+ stub->sync_strategy ().buffering_constraints_reached (stub,
+ msg_count,
+ total_bytes,
+ must_flush,
+ this->current_deadline_,
+ set_timer,
+ new_deadline);
+
+ // ... set the new timer, also cancel any previous timers ...
+ if (set_timer)
+ {
+ ACE_Event_Handler *eh = this->event_handler_i ();
+ if (eh != 0)
+ {
+ ACE_Reactor *reactor = eh->reactor ();
+ if (reactor != 0)
+ {
+ this->current_deadline_ = new_deadline;
+ ACE_Time_Value delay =
+ new_deadline - ACE_OS::gettimeofday ();
+
+ if (this->flush_timer_pending ())
+ {
+ (void) reactor->cancel_timer (this->flush_timer_id_);
+ }
+ this->flush_timer_id_ =
+ reactor->schedule_timer (&this->transport_timer_,
+ &this->current_deadline_,
+ delay);
+ }
+ }
+ }
+
+ return constraints_reached;
}
void
-TAO_Transport::close_connection_i (void)
+TAO_Transport::report_invalid_event_handler (const char *caller)
{
- ACE_Event_Handler * eh = this->invalidate_event_handler_i ();
- this->close_connection_shared (1, eh);
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) - Transport[%d]::report_invalid_event_handler"
+ "(%s) no longer associated with handler [tag=%d]\n",
+ this->id (), caller, this->tag_));
+ }
}
-void
-TAO_Transport::close_connection_no_purge (void)
+ACE_Event_Handler *
+TAO_Transport::invalidate_event_handler (void)
{
- ACE_Event_Handler * eh = this->invalidate_event_handler ();
- this->close_connection_shared (0, eh);
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0));
+
+ return this->invalidate_event_handler_i ();
}
void
-TAO_Transport::close_connection_shared (int disable_purge,
- ACE_Event_Handler * eh)
+TAO_Transport::send_connection_closed_notifications (void)
{
- // Purge the entry
- if (!disable_purge && this->cache_map_entry_ != 0)
+ while (this->head_ != 0)
{
- this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
- }
+ TAO_Queued_Message *i = this->head_;
- if (eh == 0)
- {
- // The connection was already closed
- return;
+ // @@ This is a good point to insert a flag to indicate that a
+ // CloseConnection message was successfully received.
+ i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED);
+
+ this->head_ = i->next ();
+
+ i->destroy ();
}
- // We first try to remove the handler from the reactor. After that
- // we destroy the handler using handle_close (). The remove handler
- // is necessary because if the handle_closed is called directly, the
- // reactor would be left with a dangling pointer.
- if (this->ws_->is_registered ())
+ this->tms ()->connection_closed ();
+ this->messaging_object ()->reset ();
+}
+
+int
+TAO_Transport::send_message_shared_i (TAO_Stub *stub,
+ int is_synchronous,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ if (is_synchronous)
{
- this->orb_core_->reactor ()->remove_handler (
- eh,
- ACE_Event_Handler::ALL_EVENTS_MASK |
- ACE_Event_Handler::DONT_CALL);
+ return this->send_synchronous_message_i (message_block,
+ max_wait_time);
}
- (void) eh->handle_close (ACE_INVALID_HANDLE,
- ACE_Event_Handler::ALL_EVENTS_MASK);
+ // Let's figure out if the message should be queued without trying
+ // to send first:
+ int try_sending_first = 1;
- this->send_connection_closed_notifications ();
-}
+ int queue_empty = (this->head_ == 0);
-ssize_t
-TAO_Transport::send (iovec *iov, int iovcnt,
- size_t &bytes_transferred,
- const ACE_Time_Value *timeout)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
+ if (!queue_empty)
+ try_sending_first = 0;
+ else if (stub->sync_strategy ().must_queue (queue_empty))
+ try_sending_first = 0;
- if (this->check_event_handler_i ("Transport::send") == -1)
- return -1;
+ ssize_t n;
- // now call the template method
- return this->send_i (iov, iovcnt, bytes_transferred, timeout);
-}
+ TAO_Flushing_Strategy *flushing_strategy =
+ this->orb_core ()->flushing_strategy ();
-ssize_t
-TAO_Transport::recv (char *buffer,
- size_t len,
- const ACE_Time_Value *timeout)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
+ if (try_sending_first)
+ {
+ size_t byte_count = 0;
+ // ... in this case we must try to send the message first ...
- if (this->check_event_handler_i ("Transport::recv") == -1)
- return -1;
+ size_t total_length = message_block->total_length ();
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "trying to send the message (ml = %d)\n",
+ this->id (), total_length));
+ }
- // now call the template method
- return this->recv_i (buffer, len, timeout);
-}
+ // @@ I don't think we want to hold the mutex here, however if
+ // we release it we need to recheck the status of the transport
+ // after we return... once I understand the final form for this
+ // code I will re-visit this decision
+ n = this->send_message_block_chain_i (message_block,
+ byte_count,
+ max_wait_time);
+ if (n == -1)
+ {
+ // ... if this is just an EWOULDBLOCK we must schedule the
+ // message for later, if it is ETIME we still have to send
+ // the complete message, because cutting off the message at
+ // this point will destroy the synchronization with the
+ // server ...
+ if (errno != EWOULDBLOCK && errno != ETIME)
+ {
+ return -1;
+ }
+ }
+ // ... let's figure out if the complete message was sent ...
+ if (total_length == byte_count)
+ {
+ // Done, just return. Notice that there are no allocations
+ // or copies up to this point (though some fancy calling
+ // back and forth).
+ // This is the common case for the critical path, it should
+ // be fast.
+ return 0;
+ }
-int
-TAO_Transport::generate_locate_request (
- TAO_Target_Specification &spec,
- TAO_Operation_Details &opdetails,
- TAO_OutputCDR &output)
-{
- if (this->messaging_object ()->generate_locate_request_header (opdetails,
- spec,
- output) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::generate_locate_request, "
- "error while marshalling the LocateRequest header\n",
- this->id ()));
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "partial send %d / %d bytes\n",
+ this->id (), byte_count, total_length));
+ }
- return -1;
+ // ... part of the data was sent, need to figure out what piece
+ // of the message block chain must be queued ...
+ while (message_block != 0 && message_block->length () == 0)
+ message_block = message_block->cont ();
+
+ // ... at least some portion of the message block chain should
+ // remain ...
+ ACE_ASSERT (message_block != 0);
}
- return 0;
-}
+ // ... either the message must be queued or we need to queue it
+ // because it was not completely sent out ...
+ if (TAO_debug_level > 6)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport[%d]::send_message_i, "
+ "message is queued\n",
+ this->id ()));
+ }
-int
-TAO_Transport::generate_request_header (
- TAO_Operation_Details &opdetails,
- TAO_Target_Specification &spec,
- TAO_OutputCDR &output)
-{
- if (this->messaging_object ()->generate_request_header (opdetails,
- spec,
- output) == -1)
+ TAO_Queued_Message *queued_message = 0;
+ ACE_NEW_RETURN (queued_message,
+ TAO_Asynch_Queued_Message (message_block),
+ -1);
+ queued_message->push_back (this->head_, this->tail_);
+
+ // ... if the queue is full we need to activate the output on the
+ // queue ...
+ int must_flush = 0;
+ int constraints_reached =
+ this->check_buffering_constraints_i (stub,
+ must_flush);
+
+ // ... but we also want to activate it if the message was partially
+ // sent.... Plus, when we use the blocking flushing strategy the
+ // queue is flushed as a side-effect of 'schedule_output()'
+
+ if (constraints_reached || try_sending_first)
{
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) - Transport[%d]::generate_request_header, "
- "error while marshalling the Request header\n",
- this->id()));
+ (void) flushing_strategy->schedule_output (this);
+ }
- return -1;
+ if (must_flush)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+
+ (void) flushing_strategy->flush_transport (this);
}
return 0;
}
+
+
+/*
+ *
+ * All the methods relevant to the incoming data path of the ORB are
+ * defined below
+ *
+ */
int
TAO_Transport::handle_input_i (TAO_Resume_Handle &rh,
ACE_Time_Value * max_wait_time,
@@ -1327,8 +1866,6 @@ TAO_Queued_Data *
TAO_Transport::make_queued_data (ACE_Message_Block &incoming)
{
// Get an instance of TAO_Queued_Data
- // @@ todo: Are we using the right allocator? May be not. We need to
- // see how to have a general purpose allocator.
TAO_Queued_Data *qd =
TAO_Queued_Data::get_queued_data (
this->orb_core_->transport_message_buffer_allocator ());
@@ -1499,535 +2036,6 @@ TAO_Transport::notify_reactor (void)
return 1;
}
-int
-TAO_Transport::queue_is_empty (void)
-{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
- return this->queue_is_empty_i ();
-}
-
-int
-TAO_Transport::queue_is_empty_i (void)
-{
- return (this->head_ == 0);
-}
-
-int
-TAO_Transport::register_handler (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock,
- guard,
- *this->handler_lock_,
- -1));
- if (this->check_event_handler_i ("Transport::register_handler") == -1)
- return -1;
-
- return this->register_handler_i ();
-}
-
-
-int
-TAO_Transport::schedule_output_i (void)
-{
- ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
-
- if (TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::schedule_output\n",
- this->id ()));
- }
-
- return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
-}
-
-int
-TAO_Transport::cancel_output_i (void)
-{
- ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh == 0)
- return -1;
-
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor == 0)
- return -1;
-
- if (TAO_debug_level > 3)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::cancel_output\n",
- this->id ()));
- }
-
- return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
-}
-
-int
-TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
- const void *act)
-{
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - TAO_Transport[%d]::handle_timeout, "
- "timer expired\n",
- this->id ()));
- }
-
- /// This is the only legal ACT in the current configuration....
- if (act != &this->current_deadline_)
- return -1;
-
- if (this->flush_timer_pending ())
- {
- // The timer is always a oneshot timer, so mark is as not
- // pending.
- this->reset_flush_timer ();
-
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
- }
- return 0;
-}
-
-int
-TAO_Transport::drain_queue (void)
-{
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
-
- int retval = this->drain_queue_i ();
-
- if (retval == 1)
- {
- // ... there is no current message or it was completely
- // sent, cancel output...
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
-
- flushing_strategy->cancel_output (this);
-
- return 0;
- }
-
- return retval;
-}
-
-int
-TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[])
-{
- if (this->check_event_handler_i ("Transport::drain_queue_helper") == -1)
- return -1;
-
- size_t byte_count = 0;
-
- // ... send the message ...
- ssize_t retval =
- this->send_i (iov, iovcnt, byte_count);
-
- if (TAO_debug_level == 5)
- {
- dump_iov (iov, iovcnt, this->id (),
- byte_count, "drain_queue_helper");
- }
-
- // ... now we need to update the queue, removing elements
- // that have been sent, and updating the last element if it
- // was only partially sent ...
- this->cleanup_queue (byte_count);
- iovcnt = 0;
-
- if (retval == 0)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
- "send() returns 0",
- this->id ()));
- }
- return -1;
- }
- else if (retval == -1)
- {
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
- "error during %p\n",
- this->id (), "send_i()"));
- }
- if (errno == EWOULDBLOCK)
- return 0;
- return -1;
- }
-
- // ... start over, how do we guarantee progress? Because if
- // no bytes are sent send() can only return 0 or -1
- ACE_ASSERT (byte_count != 0);
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_helper, "
- "byte_count = %d, head_is_empty = %d\n",
- this->id(), byte_count, (this->head_ == 0)));
- }
- return 1;
-}
-
-int
-TAO_Transport::drain_queue_i (void)
-{
- // This is the vector used to send data, it must be declared outside
- // the loop because after the loop there may still be data to be
- // sent
- int iovcnt = 0;
- iovec iov[IOV_MAX];
-
- // We loop over all the elements in the queue ...
- TAO_Queued_Message *i = this->head_;
- while (i != 0)
- {
- // ... each element fills the iovector ...
- i->fill_iov (IOV_MAX, iovcnt, iov);
-
- // ... the vector is full, no choice but to send some data out.
- // We need to loop because a single message can span multiple
- // IOV_MAX elements ...
- if (iovcnt == IOV_MAX)
- {
- int retval =
- this->drain_queue_helper (iovcnt, iov);
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
- "helper retval = %d\n",
- this->id (), retval));
- }
- if (retval != 1)
- return retval;
-
- i = this->head_;
- continue;
- }
- // ... notice that this line is only reached if there is still
- // room in the iovector ...
- i = i->next ();
- }
-
-
- if (iovcnt != 0)
- {
- int retval =
- this->drain_queue_helper (iovcnt, iov);
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::drain_queue_i, "
- "helper retval = %d\n",
- this->id (), retval));
- }
- if (retval != 1)
- return retval;
- }
-
- if (this->head_ == 0)
- {
- if (this->flush_timer_pending ())
- {
- ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- }
- this->reset_flush_timer ();
- }
- return 1;
- }
-
- return 0;
-}
-
-void
-TAO_Transport::cleanup_queue (size_t byte_count)
-{
- while (this->head_ != 0 && byte_count > 0)
- {
- TAO_Queued_Message *i = this->head_;
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
- "byte_count = %d\n",
- this->id (), byte_count));
- }
-
- // Update the state of the first message
- i->bytes_transferred (byte_count);
-
- if (TAO_debug_level > 4)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::cleanup_queue, "
- "after transfer, bc = %d, all_sent = %d, ml = %d\n",
- this->id (), byte_count, i->all_data_sent (),
- i->message_length ()));
- }
-
- // ... if all the data was sent the message must be removed from
- // the queue...
- if (i->all_data_sent ())
- {
- i->remove_from_list (this->head_, this->tail_);
- i->destroy ();
- }
- }
-}
-
-int
-TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub,
- int &must_flush)
-{
- // First let's compute the size of the queue:
- size_t msg_count = 0;
- size_t total_bytes = 0;
- for (TAO_Queued_Message *i = this->head_; i != 0; i = i->next ())
- {
- msg_count++;
- total_bytes += i->message_length ();
- }
-
- int set_timer;
- ACE_Time_Value new_deadline;
-
- int constraints_reached =
- stub->sync_strategy ().buffering_constraints_reached (stub,
- msg_count,
- total_bytes,
- must_flush,
- this->current_deadline_,
- set_timer,
- new_deadline);
-
- // ... set the new timer, also cancel any previous timers ...
- if (set_timer)
- {
- ACE_Event_Handler *eh = this->event_handler_i ();
- if (eh != 0)
- {
- ACE_Reactor *reactor = eh->reactor ();
- if (reactor != 0)
- {
- this->current_deadline_ = new_deadline;
- ACE_Time_Value delay =
- new_deadline - ACE_OS::gettimeofday ();
-
- if (this->flush_timer_pending ())
- {
- (void) reactor->cancel_timer (this->flush_timer_id_);
- }
- this->flush_timer_id_ =
- reactor->schedule_timer (&this->transport_timer_,
- &this->current_deadline_,
- delay);
- }
- }
- }
-
- return constraints_reached;
-}
-
-void
-TAO_Transport::report_invalid_event_handler (const char *caller)
-{
- if (TAO_debug_level > 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) - Transport[%d]::report_invalid_event_handler"
- "(%s) no longer associated with handler [tag=%d]\n",
- this->id (), caller, this->tag_));
- }
-}
-
-ACE_Event_Handler *
-TAO_Transport::invalidate_event_handler (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, guard, *this->handler_lock_, 0));
-
- return this->invalidate_event_handler_i ();
-}
-
-void
-TAO_Transport::send_connection_closed_notifications (void)
-{
- while (this->head_ != 0)
- {
- TAO_Queued_Message *i = this->head_;
-
- // @@ This is a good point to insert a flag to indicate that a
- // CloseConnection message was successfully received.
- i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED);
-
- this->head_ = i->next ();
-
- i->destroy ();
- }
-
- this->tms ()->connection_closed ();
- this->messaging_object ()->reset ();
-}
-
-int
-TAO_Transport::send_message_shared_i (TAO_Stub *stub,
- int is_synchronous,
- const ACE_Message_Block *message_block,
- ACE_Time_Value *max_wait_time)
-{
- if (is_synchronous)
- {
- return this->send_synchronous_message_i (message_block,
- max_wait_time);
- }
-
- // Let's figure out if the message should be queued without trying
- // to send first:
- int try_sending_first = 1;
-
- int queue_empty = (this->head_ == 0);
-
- if (!queue_empty)
- try_sending_first = 0;
- else if (stub->sync_strategy ().must_queue (queue_empty))
- try_sending_first = 0;
-
- ssize_t n;
-
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
-
- if (try_sending_first)
- {
- size_t byte_count = 0;
- // ... in this case we must try to send the message first ...
-
- size_t total_length = message_block->total_length ();
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- "trying to send the message (ml = %d)\n",
- this->id (), total_length));
- }
-
- // @@ I don't think we want to hold the mutex here, however if
- // we release it we need to recheck the status of the transport
- // after we return... once I understand the final form for this
- // code I will re-visit this decision
- n = this->send_message_block_chain_i (message_block,
- byte_count,
- max_wait_time);
- if (n == -1)
- {
- // ... if this is just an EWOULDBLOCK we must schedule the
- // message for later, if it is ETIME we still have to send
- // the complete message, because cutting off the message at
- // this point will destroy the synchronization with the
- // server ...
- if (errno != EWOULDBLOCK && errno != ETIME)
- {
- return -1;
- }
- }
-
- // ... let's figure out if the complete message was sent ...
- if (total_length == byte_count)
- {
- // Done, just return. Notice that there are no allocations
- // or copies up to this point (though some fancy calling
- // back and forth).
- // This is the common case for the critical path, it should
- // be fast.
- return 0;
- }
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- "partial send %d / %d bytes\n",
- this->id (), byte_count, total_length));
- }
-
- // ... part of the data was sent, need to figure out what piece
- // of the message block chain must be queued ...
- while (message_block != 0 && message_block->length () == 0)
- message_block = message_block->cont ();
-
- // ... at least some portion of the message block chain should
- // remain ...
- ACE_ASSERT (message_block != 0);
- }
-
- // ... either the message must be queued or we need to queue it
- // because it was not completely sent out ...
-
- if (TAO_debug_level > 6)
- {
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport[%d]::send_message_i, "
- "message is queued\n",
- this->id ()));
- }
-
- TAO_Queued_Message *queued_message = 0;
- ACE_NEW_RETURN (queued_message,
- TAO_Asynch_Queued_Message (message_block),
- -1);
- queued_message->push_back (this->head_, this->tail_);
-
- // ... if the queue is full we need to activate the output on the
- // queue ...
- int must_flush = 0;
- int constraints_reached =
- this->check_buffering_constraints_i (stub,
- must_flush);
-
- // ... but we also want to activate it if the message was partially
- // sent.... Plus, when we use the blocking flushing strategy the
- // queue is flushed as a side-effect of 'schedule_output()'
-
- if (constraints_reached || try_sending_first)
- {
- (void) flushing_strategy->schedule_output (this);
- }
-
- if (must_flush)
- {
- typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
- TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
-
- (void) flushing_strategy->flush_transport (this);
- }
-
- return 0;
-}
-
TAO_Transport_Cache_Manager &
TAO_Transport::transport_cache_manager (void)
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index d973f2bc835..4cfc9320f98 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -13,14 +13,12 @@
*/
//=============================================================================
-
#ifndef TAO_TRANSPORT_H
#define TAO_TRANSPORT_H
#include "ace/pre.h"
#include "corbafwd.h"
-
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
@@ -29,8 +27,8 @@
#include "Transport_Descriptor_Interface.h"
#include "Transport_Cache_Manager.h"
#include "Transport_Timer.h"
-#include "ace/Refcountable.h"
#include "Incoming_Message_Queue.h"
+#include "Synch_Refcountable.h"
class TAO_ORB_Core;
class TAO_Target_Specification;
@@ -43,21 +41,7 @@ class TAO_Pluggable_Messaging;
class TAO_Queued_Message;
class TAO_Resume_Handle;
-class TAO_Export TAO_Synch_Refcountable : private ACE_Refcountable
-{
-public:
- virtual ~TAO_Synch_Refcountable (void);
-
- int increment (void);
- int decrement (void);
-
- int refcount (void) const;
-protected:
- TAO_Synch_Refcountable (ACE_Lock *lock, int refcount);
-
- ACE_Lock *refcount_lock_;
-};
/**
* @class TAO_Transport
@@ -235,6 +219,10 @@ public:
/// destructor
virtual ~TAO_Transport (void);
+ // Maintain reference counting with these
+ static TAO_Transport* _duplicate (TAO_Transport* transport);
+ static void release (TAO_Transport* transport);
+
/// Return the protocol tag.
/**
* The OMG assigns unique tags (a 32-bit unsigned number) to each
@@ -280,34 +268,30 @@ public:
void cache_map_entry (TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
TAO_Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry (void);
- /// Return the identifier for this transport instance.
+ /// Set and Get the identifier for this transport instance.
/**
* If not set, this will return an integer representation of
* the <code>this</code> pointer for the instance on which
* it's called.
*/
int id (void) const;
- /// Set the identifier for this transport instance.
void id (int id);
- /// Return the order for the purging strategy.
+ /// Get and Set the purging order. The purging strategy uses the set
+ /// version to set the purging order.
unsigned long purging_order (void) const;
-
- /// Allow the purging strategy to set the order.
void purging_order(unsigned long value);
+ /// Check if there are messages pending in the queue
/**
- * Initialising the messaging object. This would be used by the
- * connector side. On the acceptor side the connection handler
- * would take care of the messaging objects.
+ * @return 1 if the queue is empty
*/
- virtual int messaging_init (CORBA::Octet major,
- CORBA::Octet minor) = 0;
+ int queue_is_empty (void);
/// Fill in a handle_set with any associated handler's reactor handle.
/**
* Called by the cache when the cache is closing in order to fill
- * in a handle_set in a lock-safe manner.
+ * in a handle_set in a thread-safe manner.
*
* @param reactor_registered the ACE_Handle_Set into which the
* transport should place any handle registered with the reactor
@@ -319,58 +303,22 @@ public:
void provide_handle (ACE_Handle_Set &reactor_registered,
TAO_EventHandlerSet &unregistered);
- /// Extracts the list of listen points from the <cdr> stream. The
- /// list would have the protocol specific details of the
- /// ListenPoints
- virtual int tear_listen_point_list (TAO_InputCDR &cdr);
/// Remove all messages from the outgoing queue.
/**
* @todo: shouldn't this be automated?
*/
- void dequeue_all (void);
-
- /// Check if there are messages pending in the queue
- /**
- * @return 1 if the queue is empty
- */
- int queue_is_empty (void);
+ // void dequeue_all (void);
- /// Register the handler with the reactor.
/**
- * This method is used by the Wait_On_Reactor strategy. The
- * transport must register its event handler with the ORB's Reactor.
- *
- * @todo: I think this method is pretty much useless, the
- * connections are *always* registered with the Reactor, except in
- * thread-per-connection mode. In that case putting the connection
- * in the Reactor would produce unpredictable results anyway.
+ * Register the handler with the reactor. This method is used by the
+ * Wait_On_Reactor strategy. The transport must register its event
+ * handler with the ORB's Reactor.
*/
- // @@ lockme
int register_handler (void);
- /**
- * @name Control connection lifecycle
- *
- * These methods are routed through the TMS object. The TMS
- * strategies implement them correctly.
- */
- //@{
-
- /// Request has been just sent, but the reply is not received. Idle
- /// the transport now.
- virtual int idle_after_send (void);
- /// Request is sent and the reply is received. Idle the transport
- /// now.
- virtual int idle_after_reply (void);
-
- /// Call the implementation method after obtaining the lock.
- virtual void close_connection (void);
-
- //@}
-
- /// Write the complete Message_Block chain to the connection.
+ /// Write the complete Message_Block chain to the connection.
/**
* This method serializes on handler_lock_, guaranteeing that only
* thread can execute it on the same instance concurrently.
@@ -428,6 +376,48 @@ public:
+ /**
+ * @name Control connection lifecycle
+ *
+ * These methods are routed through the TMS object. The TMS
+ * strategies implement them correctly.
+ */
+ //@{
+
+ /// Request has been just sent, but the reply is not received. Idle
+ /// the transport now.
+ virtual int idle_after_send (void);
+
+ /// Request is sent and the reply is received. Idle the transport
+ /// now.
+ virtual int idle_after_reply (void);
+
+ /// Call the implementation method after obtaining the lock.
+ virtual void close_connection (void);
+
+ //@}
+
+ /** @name Template methods
+ *
+ * The Transport class uses the Template Method Pattern to implement
+ * the protocol specific functionality.
+ * Implementors of a pluggable protocol should override the
+ * following methods with the semantics documented below.
+ */
+ /**
+ * Initialising the messaging object. This would be used by the
+ * connector side. On the acceptor side the connection handler
+ * would take care of the messaging objects.
+ */
+ virtual int messaging_init (CORBA::Octet major,
+ CORBA::Octet minor) = 0;
+
+
+
+ /// Extracts the list of listen points from the <cdr> stream. The
+ /// list would have the protocol specific details of the
+ /// ListenPoints
+ virtual int tear_listen_point_list (TAO_InputCDR &cdr);
protected:
/** @name Template methods
@@ -537,6 +527,13 @@ public:
TAO_Target_Specification &spec,
TAO_OutputCDR &msg);
+ /// recache ourselves in the cache
+ int recache_transport (TAO_Transport_Descriptor_Interface* desc);
+
+ /// Method for the connection handler to signify that it
+ /// is being closed and destroyed.
+ virtual void connection_handler_closing (void);
+
/// Callback to read incoming data
/**
* The ACE_Event_Handler adapter invokes this method as part of its
@@ -681,16 +678,7 @@ protected:
TAO_Queued_Data *make_queued_data (ACE_Message_Block &incoming);
public:
- /// Method for the connection handler to signify that it
- /// is being closed and destroyed.
- virtual void connection_handler_closing (void);
- // Maintain reference counting with these
- static TAO_Transport* _duplicate (TAO_Transport* transport);
- static void release (TAO_Transport* transport);
-
- /// recache ourselves in the cache
- int recache_transport (TAO_Transport_Descriptor_Interface* desc);
diff --git a/TAO/tao/Transport.inl b/TAO/tao/Transport.inl
index fc277e14eb4..236d4d93b39 100644
--- a/TAO/tao/Transport.inl
+++ b/TAO/tao/Transport.inl
@@ -80,6 +80,17 @@ TAO_Transport::id (int id)
this->id_ = id;
}
+ACE_INLINE int
+TAO_Transport::queue_is_empty (void)
+{
+ ACE_GUARD_RETURN (ACE_Lock,
+ ace_mon,
+ *this->handler_lock_,
+ -1);
+ return this->queue_is_empty_i ();
+}
+
+
ACE_INLINE int
TAO_Transport::flush_timer_pending (void) const