summaryrefslogtreecommitdiff
path: root/TAO/tao/Synch_Invocation.cpp
diff options
context:
space:
mode:
authorstanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2013-02-05 21:11:03 +0000
committerstanleyk <stanleyk@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2013-02-05 21:11:03 +0000
commit5e030faf84086ab02059fcbcc3faed224bd57b95 (patch)
tree3a62df45ac6ccf599fb07cf6a03d672456ce2e3d /TAO/tao/Synch_Invocation.cpp
parent9d296f7fa51116ff7040ecb2ad18612cd94b5fd1 (diff)
downloadATCD-5e030faf84086ab02059fcbcc3faed224bd57b95.tar.gz
Merge in OCI_Reliability_Enhancements branch.
Diffstat (limited to 'TAO/tao/Synch_Invocation.cpp')
-rw-r--r--TAO/tao/Synch_Invocation.cpp340
1 files changed, 220 insertions, 120 deletions
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp
index 40064362ca6..71ef977e300 100644
--- a/TAO/tao/Synch_Invocation.cpp
+++ b/TAO/tao/Synch_Invocation.cpp
@@ -1,6 +1,7 @@
// $Id$
#include "tao/Synch_Invocation.h"
+#include "tao/Invocation_Retry_State.h"
#include "tao/Profile_Transport_Resolver.h"
#include "tao/Profile.h"
#include "tao/Synch_Reply_Dispatcher.h"
@@ -42,9 +43,16 @@ namespace TAO
resolver,
detail,
response_expected)
+ , retry_state_ (0)
{
}
+ void
+ Synch_Twoway_Invocation::set_retry_state (Invocation_Retry_State *retry_state)
+ {
+ this->retry_state_ = retry_state;
+ }
+
Invocation_Status
Synch_Twoway_Invocation::remote_twoway (ACE_Time_Value *max_wait_time)
{
@@ -79,10 +87,25 @@ namespace TAO
if (!transport)
{
- // Way back, we failed to find a profile we could connect to.
- // We've come this far only so we reach the interception points
- // in case they can fix things. Time to bail....
- throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
+ if (this->retry_state_ &&
+ this->retry_state_->forward_on_exception_increment(FOE_TRANSIENT))
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("remote_twoway retrying on TRANSIENT ")
+ ACE_TEXT ("exception\n")));
+ this->retry_state_->next_profile_retry (*this->stub ());
+ return TAO_INVOKE_RESTART;
+ }
+ else
+ {
+ // Way back, we failed to find a profile we could connect to.
+ // We've come this far only so we reach the interception points
+ // in case they can fix things. Time to bail....
+ throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
+ }
+
}
ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon,
@@ -259,8 +282,8 @@ namespace TAO
if (TAO_debug_level > 0 && max_wait_time)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, "
- "timeout after recv is <%u> status <%d>\n",
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, ")
+ ACE_TEXT ("timeout after recv is <%u> status <%d>\n"),
max_wait_time->msec (),
reply_error));
}
@@ -273,8 +296,9 @@ namespace TAO
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Twoway_Invocation::wait_for_reply, "
- "recovering after an error\n"));
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("wait_for_reply, ")
+ ACE_TEXT ("recovering after an error\n")));
}
// You the smarty, don't try to moving the unbind_dispatcher
@@ -312,6 +336,19 @@ namespace TAO
(void) bd.unbind_dispatcher ();
this->resolver_.transport ()->close_connection ();
+ if (this->retry_state_ &&
+ this->resolver_.transport ()->connection_closed_on_read() &&
+ this->retry_state_->forward_on_reply_closed_increment ())
+ {
+ if (TAO_debug_level > 4)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("wait_for_reply, forward profile on ")
+ ACE_TEXT ("connection closed\n")));
+ this->retry_state_->next_profile_retry (*this->stub ());
+ return TAO_INVOKE_RESTART;
+ }
+
try
{
return
@@ -322,7 +359,11 @@ namespace TAO
}
catch (const ::CORBA::Exception&)
{
- this->resolver_.stub ()->reset_profiles ();
+ if (this->retry_state_ == 0 ||
+ !this->retry_state_->forward_on_exception_limit_used ())
+ {
+ this->resolver_.stub ()->reset_profiles ();
+ }
throw;
}
}
@@ -386,8 +427,9 @@ namespace TAO
// permanent condition not given
if (TAO_debug_level > 3)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Twoway_Invocation::"
- "check_reply_status: unexpected LOCATION_FORWARD_PERM reply\n"));
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("check_reply_status: unexpected ")
+ ACE_TEXT ("LOCATION_FORWARD_PERM reply\n")));
throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
}
@@ -439,8 +481,8 @@ namespace TAO
if (TAO_debug_level > 3)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::location_forward ")
- ACE_TEXT ("being handled\n")));
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("location_forward being handled\n")));
}
CORBA::Object_var fwd;
@@ -469,8 +511,8 @@ namespace TAO
if (TAO_debug_level > 3)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Twoway_Invocation::"
- "handle_user_exception\n"));
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("handle_user_exception\n")));
// Pull the exception from the stream.
CORBA::String_var buf;
@@ -513,8 +555,8 @@ namespace TAO
if (TAO_debug_level > 3)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Twoway_Invocation::"
- "handle_system_exception\n"));
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("handle_system_exception\n")));
CORBA::String_var type_id;
@@ -533,40 +575,80 @@ namespace TAO
throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_MAYBE);
}
+ bool retry_on_exception = false;
bool do_forward = false;
- int foe_kind = this->stub ()->orb_core ()->orb_params ()->forward_once_exception();
-
- if ((CORBA::CompletionStatus) completion != CORBA::COMPLETED_YES
- && (((foe_kind & TAO::FOE_TRANSIENT) == 0
- && ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0) ||
- ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/OBJ_ADAPTER:1.0") == 0 ||
- ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/NO_RESPONSE:1.0") == 0 ||
- ((foe_kind & TAO::FOE_COMM_FAILURE) == 0
- && ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0) ||
- (this->stub ()->orb_core ()->orb_params ()->forward_invocation_on_object_not_exist ()
- && ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0) ||
- (do_forward = ! this->stub ()->forwarded_on_exception ()
- && ((((foe_kind & TAO::FOE_OBJECT_NOT_EXIST) == TAO::FOE_OBJECT_NOT_EXIST)
- && (ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0)) ||
- (((foe_kind & TAO::FOE_COMM_FAILURE) == TAO::FOE_COMM_FAILURE)
- && (ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0)) ||
- (((foe_kind & TAO::FOE_TRANSIENT) == TAO::FOE_TRANSIENT)
- && (ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0)) ||
- (((foe_kind & TAO::FOE_INV_OBJREF) == TAO::FOE_INV_OBJREF)
- && (ACE_OS_String::strcmp (type_id.in (),
- "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0))))))
+
+ const TAO_ORB_Parameters *orb_params = this->stub ()->orb_core ()->orb_params ();
+
+ if (this->retry_state_ &&
+ this->retry_state_->forward_on_exception_limit_used () &&
+ (CORBA::CompletionStatus) completion == CORBA::COMPLETED_NO)
+ {
+ if ((ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0 &&
+ this->retry_state_->forward_on_exception_increment (TAO::FOE_TRANSIENT)) ||
+ (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0 &&
+ this->retry_state_->forward_on_exception_increment (TAO::FOE_COMM_FAILURE)) ||
+ (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0 &&
+ this->retry_state_->forward_on_exception_increment (TAO::FOE_OBJECT_NOT_EXIST)) ||
+ (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0 &&
+ this->retry_state_->forward_on_exception_increment (TAO::FOE_INV_OBJREF))
+ )
+ {
+ retry_on_exception = true;
+ this->retry_state_->sleep_at_starting_profile (*this->stub ());
+ }
+ }
+ else
+ {
+ int foe_kind = orb_params->forward_once_exception();
+
+ retry_on_exception =
+ (CORBA::CompletionStatus) completion != CORBA::COMPLETED_YES
+ && (((foe_kind & TAO::FOE_TRANSIENT) == 0
+ && ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0) ||
+ ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/OBJ_ADAPTER:1.0") == 0 ||
+ ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/NO_RESPONSE:1.0") == 0 ||
+ ((foe_kind & TAO::FOE_COMM_FAILURE) == 0
+ && ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0) ||
+ (orb_params->forward_invocation_on_object_not_exist ()
+ && ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0) ||
+ (do_forward = ! this->stub ()->forwarded_on_exception ()
+ && ((((foe_kind & TAO::FOE_OBJECT_NOT_EXIST) == TAO::FOE_OBJECT_NOT_EXIST)
+ && (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/OBJECT_NOT_EXIST:1.0") == 0)) ||
+ (((foe_kind & TAO::FOE_COMM_FAILURE) == TAO::FOE_COMM_FAILURE)
+ && (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/COMM_FAILURE:1.0") == 0)) ||
+ (((foe_kind & TAO::FOE_TRANSIENT) == TAO::FOE_TRANSIENT)
+ && (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/TRANSIENT:1.0") == 0)) ||
+ (((foe_kind & TAO::FOE_INV_OBJREF) == TAO::FOE_INV_OBJREF)
+ && (ACE_OS_String::strcmp (type_id.in (),
+ "IDL:omg.org/CORBA/INV_OBJREF:1.0") == 0)))));
+ }
+
+ if (retry_on_exception)
{
// If we are here then possibly we'll need a restart.
mon.set_status (TAO_INVOKE_RESTART);
+ if (TAO_debug_level > 4)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("handle_system_exception, profile forwarding ")
+ ACE_TEXT ("on exception "),
+ type_id.in (),
+ ACE_TEXT ("\n")));
+
if (do_forward)
this->stub ()->forwarded_on_exception (true);
@@ -625,8 +707,8 @@ namespace TAO
if (TAO_debug_level > 4)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Twoway_Invocation::"
- "handle_system_exception, about to raise\n"));
+ ACE_TEXT ("TAO (%P|%t) - Synch_Twoway_Invocation::")
+ ACE_TEXT ("handle_system_exception, about to raise\n")));
mon.set_status (TAO_INVOKE_SYSTEM_EXCEPTION);
@@ -676,80 +758,98 @@ namespace TAO
if (!transport)
{
- // Way back, we failed to find a profile we could connect to.
- // We've come this far only so we reach the interception points
- // in case they can fix things. Time to bail....
- throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
- }
+ if (this->retry_state_ &&
+ this->retry_state_->forward_on_exception_limit_used ())
+ {
+ if (this->retry_state_->forward_on_exception_increment(FOE_TRANSIENT))
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("TAO (%P|%t) - Synch_Oneway_Invocation::")
+ ACE_TEXT ("remote_oneway retrying on TRANSIENT ")
+ ACE_TEXT ("exception\n")));
+ this->retry_state_->next_profile_retry (*this->stub ());
+ return TAO_INVOKE_RESTART;
+ }
+ }
+ else
+ {
+ // Way back, we failed to find a profile we could connect to.
+ // We've come this far only so we reach the interception points
+ // in case they can fix things. Time to bail....
+ throw CORBA::TRANSIENT (CORBA::OMGVMCID | 2, CORBA::COMPLETED_NO);
+ }
- {
- ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, transport->output_cdr_lock (),
- TAO_INVOKE_FAILURE);
+ }
- TAO_OutputCDR &cdr = transport->out_stream ();
+ {
+ ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, transport->output_cdr_lock (),
+ TAO_INVOKE_FAILURE);
- cdr.message_attributes (this->details_.request_id (),
- this->resolver_.stub (),
- TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST),
- max_wait_time);
+ TAO_OutputCDR &cdr = transport->out_stream ();
- this->write_header (cdr);
+ cdr.message_attributes (this->details_.request_id (),
+ this->resolver_.stub (),
+ TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST),
+ max_wait_time);
- this->marshal_data (cdr);
+ this->write_header (cdr);
- countdown.update ();
+ this->marshal_data (cdr);
- if (transport->is_connected ())
- {
- // We have a connected transport so we can send the message
- s = this->send_message (cdr,
- TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST),
- max_wait_time);
+ countdown.update ();
- if (transport->wait_strategy ()->non_blocking () == 0 &&
- transport->orb_core ()->client_factory ()->use_cleanup_options ())
- {
- if (!transport->wait_strategy ()->is_registered())
- {
- ACE_Event_Handler * const eh =
- transport->event_handler_i ();
-
- ACE_Reactor * const r =
- transport->orb_core ()->reactor ();
-
- if (r->register_handler (eh, ACE_Event_Handler::READ_MASK) == -1)
- {
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Synch_Oneway_Invocation::"
- "remote_oneway transport[%d] registration with"
- "reactor returned an error\n",
- transport->id ()));
- }
- else
- {
- // Only set this flag when registration succeeds
- transport->wait_strategy ()->is_registered(true);
- }
- }
- }
+ if (transport->is_connected ())
+ {
+ // We have a connected transport so we can send the message
+ s = this->send_message (cdr,
+ TAO_Message_Semantics (TAO_Message_Semantics::TAO_ONEWAY_REQUEST),
+ max_wait_time);
- }
- else
- {
- if (TAO_debug_level > 4)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Synch_Oneway_Invocation::"
- "remote_oneway, queueing message\n"));
+ if (transport->wait_strategy ()->non_blocking () == 0 &&
+ transport->orb_core ()->client_factory ()->use_cleanup_options ())
+ {
+ if (!transport->wait_strategy ()->is_registered())
+ {
+ ACE_Event_Handler * const eh =
+ transport->event_handler_i ();
+
+ ACE_Reactor * const r =
+ transport->orb_core ()->reactor ();
+
+ if (r->register_handler (eh, ACE_Event_Handler::READ_MASK) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - Synch_Oneway_Invocation::")
+ ACE_TEXT ("remote_oneway transport[%d] registration with")
+ ACE_TEXT ("reactor returned an error\n"),
+ transport->id ()));
+ }
+ else
+ {
+ // Only set this flag when registration succeeds
+ transport->wait_strategy ()->is_registered (true);
+ }
+ }
+ }
- if (transport->format_queue_message (cdr,
- max_wait_time,
- this->resolver_.stub()) != 0)
- {
- s = TAO_INVOKE_FAILURE;
- }
- }
- }
+ }
+ else
+ {
+ if (TAO_debug_level > 4)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Synch_Oneway_Invocation::")
+ ACE_TEXT ("remote_oneway, queueing message\n")));
+
+ if (transport->format_queue_message (cdr,
+ max_wait_time,
+ this->resolver_.stub()) != 0)
+ {
+ s = TAO_INVOKE_FAILURE;
+ }
+ }
+ }
#if TAO_HAS_INTERCEPTORS == 1
s = this->receive_other_interception ();
@@ -763,7 +863,7 @@ namespace TAO
status == PortableInterceptor::TRANSPORT_RETRY)
s = TAO_INVOKE_RESTART;
else if (status == PortableInterceptor::SYSTEM_EXCEPTION
- || status == PortableInterceptor::USER_EXCEPTION)
+ || status == PortableInterceptor::USER_EXCEPTION)
throw;
}
catch (...)
@@ -771,14 +871,14 @@ namespace TAO
// Notify interceptors of non-CORBA exception, and propagate
// that exception to the caller.
- PortableInterceptor::ReplyStatus const st =
- this->handle_all_exception ();
+ PortableInterceptor::ReplyStatus const st =
+ this->handle_all_exception ();
- if (st == PortableInterceptor::LOCATION_FORWARD ||
- st == PortableInterceptor::TRANSPORT_RETRY)
- s = TAO_INVOKE_RESTART;
- else
- throw;
+ if (st == PortableInterceptor::LOCATION_FORWARD ||
+ st == PortableInterceptor::TRANSPORT_RETRY)
+ s = TAO_INVOKE_RESTART;
+ else
+ throw;
}
#endif /* TAO_HAS_INTERCEPTORS */