summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Mesnier <mesnier_p@ociweb.com>2013-05-29 21:42:08 +0000
committerPhil Mesnier <mesnier_p@ociweb.com>2013-05-29 21:42:08 +0000
commit6ea1bdd1685be6ab58ce8a32c7a715050684663a (patch)
tree068694ee6335e8dd2dbb7005008568dcc595771d
parent5833fce190ec23aa49256ec4a9d49869d13ab796 (diff)
downloadATCD-6ea1bdd1685be6ab58ce8a32c7a715050684663a.tar.gz
Wed May 29 21:32:51 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
-rw-r--r--TAO/ChangeLog25
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp63
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h4
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp2
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp171
-rw-r--r--TAO/orbsvcs/ImplRepo_Service/LiveCheck.h42
6 files changed, 245 insertions, 62 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog
index 216e6d3994f..cfe5e92c764 100644
--- a/TAO/ChangeLog
+++ b/TAO/ChangeLog
@@ -1,3 +1,28 @@
+Wed May 29 21:32:51 UTC 2013 Phil Mesnier <mesnier_p@ociweb.com>
+
+ * orbsvcs/ImplRepo_Service/AsyncListManager.h:
+ * orbsvcs/ImplRepo_Service/AsyncListManager.cpp:
+
+ Add in the "jacorb" prefix for JacORB based server identifiers.
+ Also add the testing of initial status to avoid waiting for non-
+ pingable servers. Add some higher verbosity level debug output.
+
+ * orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp:
+
+ Minor code style fix.
+
+ * orbsvcs/ImplRepo_Service/LiveCheck.h:
+ * orbsvcs/ImplRepo_Service/LiveCheck.cpp:
+
+ Support the initial setting of of state when a listener is added.
+
+ Prevent redundant timeout registrations by deferring a timer schedule
+ if one is currently being processed. This could happen if a ping
+ required a non-blocking connect, allowing a new request to be received.
+ Subsequent pings may still be requested, but only the next closest
+ time will be schheduled after a current handle timeout is completed.
+
+
Wed May 29 19:27:35 UTC 2013 Johnny Willemsen <jwillemsen@remedy.nl>
* orbsvcs/tests/Trading/Offer_Exporter.cpp:
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp
index ad4bf4347eb..3682be1f289 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.cpp
@@ -161,8 +161,15 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many)
it.advance ();
Server_Info_Ptr info = entry->int_id_;
-
- this->server_list_[i].server = info->name.c_str ();
+ if (info->jacorb_server)
+ {
+ ACE_CString jacorb_name (ACE_TEXT ("JACORB:") + info->name);
+ this->server_list_[i].server = jacorb_name.c_str ();
+ }
+ else
+ {
+ this->server_list_[i].server = info->name.c_str ();
+ }
this->server_list_[i].startup.command_line = info->cmdline.c_str ();
this->server_list_[i].startup.environment = info->env_vars;
this->server_list_[i].startup.working_directory = info->dir.c_str ();
@@ -187,7 +194,10 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many)
}
else
{
- this->waiters_++;
+ if (!evaluate_status (i,l->status()))
+ {
+ this->waiters_++;
+ }
}
}
}
@@ -197,7 +207,7 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many)
ORBSVCS_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) AsyncListManager::list_i, %d waiters")
ACE_TEXT (" out of %d regsitered servers\n"),
- this->waiters_, (this->pinger_ != 0)));
+ this->waiters_, len));
}
if (this->waiters_ == 0)
@@ -206,10 +216,10 @@ AsyncListManager::list_i (CORBA::ULong start, CORBA::ULong how_many)
}
}
-
-void
-AsyncListManager::ping_replied (CORBA::ULong index, LiveStatus status)
+bool
+AsyncListManager::evaluate_status (CORBA::ULong index, LiveStatus status)
{
+ bool is_final = true;
switch (status)
{
case LS_ALIVE:
@@ -226,16 +236,26 @@ AsyncListManager::ping_replied (CORBA::ULong index, LiveStatus status)
ImplementationRepository::ACTIVE_NO;
break;
default:
- if (ImR_Locator_i::debug() > 4)
- {
- ORBSVCS_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) AsyncListManager::ping_replied, index = %d ")
- ACE_TEXT ("status = %d\n")));
- }
+ is_final = false;
+ }
+ return is_final;
+}
+
+void
+AsyncListManager::ping_replied (CORBA::ULong index, LiveStatus status)
+{
+ if (evaluate_status (index, status))
+ {
+ this->waiters_--;
+ this->final_state();
return;
}
- this->waiters_--;
- this->final_state();
+ if (ImR_Locator_i::debug() > 4)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) AsyncListManager::ping_replied, index = %d ")
+ ACE_TEXT ("status = %d\n")));
+ }
}
AsyncListManager *
@@ -272,7 +292,8 @@ ListLiveListener::ListLiveListener (const char *server,
owner_ (owner->_add_ref ()),
pinger_ (pinger),
status_ (LS_UNKNOWN),
- index_ (index)
+ index_ (index),
+ started_ (false)
{
}
@@ -284,9 +305,16 @@ bool
ListLiveListener::start (void)
{
bool rtn = this->pinger_.add_poll_listener (this);
+ this->started_ = true;
return rtn;
}
+LiveStatus
+ListLiveListener::status (void)
+{
+ return this->status_;
+}
+
bool
ListLiveListener::status_changed (LiveStatus status)
{
@@ -297,7 +325,8 @@ ListLiveListener::status_changed (LiveStatus status)
}
else
{
- this->owner_->ping_replied (this->index_, status);
+ if (this->started_)
+ this->owner_->ping_replied (this->index_, status);
}
return true;
}
diff --git a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h
index 9aa5799b86e..b31ebe984ea 100644
--- a/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h
+++ b/TAO/orbsvcs/ImplRepo_Service/AsyncListManager.h
@@ -53,6 +53,7 @@ class AsyncListManager
CORBA::ULong list (ImplementationRepository::AMH_ServerInformationIteratorResponseHandler_ptr _tao_rh,
CORBA::ULong start, CORBA::ULong count);
+ bool evaluate_status (CORBA::ULong index, LiveStatus status);
void ping_replied (CORBA::ULong index, LiveStatus status);
AsyncListManager *_add_ref (void);
@@ -90,7 +91,7 @@ class ListLiveListener : public LiveListener
virtual ~ListLiveListener (void);
bool start (void);
-
+ LiveStatus status (void);
bool status_changed (LiveStatus status);
private:
@@ -98,6 +99,7 @@ class ListLiveListener : public LiveListener
LiveCheck &pinger_;
LiveStatus status_;
CORBA::ULong index_;
+ bool started_;
};
diff --git a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
index bf24cc46296..0ce84d357d8 100644
--- a/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/ImR_Locator_i.cpp
@@ -554,7 +554,7 @@ ImR_Locator_i::activate_server_i (UpdateableServerInfo& info,
ACE_NEW (aam_raw, AsyncAccessManager (*info, manual_start, *this));
aam = aam_raw;
this->aam_set_.insert_tail (aam);
- }
+ }
else
{
aam = this->find_aam (info->name.c_str());
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
index e93571dcb0d..cd226dd123c 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.cpp
@@ -149,13 +149,25 @@ LiveEntry::reset_status (void)
this->repings_ = 0;
this->next_check_ = ACE_High_Res_Timer::gettimeofday_hr();
}
+ if (ImR_Locator_i::debug () > 2)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) LiveEntry::reset_status this = %x, ")
+ ACE_TEXT ("server = %C status = %s\n"),
+ this, this->server_.c_str(),
+ status_name (this->liveliness_)));
+ }
+
}
LiveStatus
LiveEntry::status (void) const
{
if (!this->may_ping_)
- return LS_ALIVE;
+ {
+ return LS_ALIVE;
+ }
+
if ( this->liveliness_ == LS_ALIVE &&
this->owner_->ping_interval() != ACE_Time_Value::zero )
@@ -243,7 +255,7 @@ LiveEntry::validate_ping (bool &want_reping, ACE_Time_Value& next)
this->liveliness_ == LS_DEAD ||
this->listeners_.size () == 0)
{
- if (ImR_Locator_i::debug () > 5)
+ if (ImR_Locator_i::debug () > 4)
{
ORBSVCS_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) LiveEntry::validate_ping, status ")
@@ -300,6 +312,14 @@ LiveEntry::validate_ping (bool &want_reping, ACE_Time_Value& next)
}
ACE_Time_Value next (ms / 1000, (ms % 1000) * 1000);
this->next_check_ = now + next;
+ if (ImR_Locator_i::debug () > 4)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) LiveEntry::validate_ping, ")
+ ACE_TEXT ("transient, reping in %d ms, ")
+ ACE_TEXT ("server %C\n"),
+ ms, this->server_.c_str()));
+ }
}
else
{
@@ -337,6 +357,10 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa)
CORBA::Object_var obj = poa->id_to_reference (oid.in());
ImplementationRepository::AMI_ServerObjectHandler_var cb =
ImplementationRepository::AMI_ServerObjectHandler::_narrow (obj.in());
+ {
+ ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
+ this->liveliness_ = LS_PING_AWAY;
+ }
try
{
this->ref_->sendc_ping (cb.in());
@@ -346,8 +370,6 @@ LiveEntry::do_ping (PortableServer::POA_ptr poa)
ACE_TEXT ("(%P|%t) LiveEntry::do_ping, ")
ACE_TEXT ("sendc_ping returned OK\n")));
}
- ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
- this->liveliness_ = LS_PING_AWAY;
}
catch (CORBA::Exception &ex)
{
@@ -422,10 +444,74 @@ PingReceiver::ping_excep (Messaging::ExceptionHolder * excep_holder)
//---------------------------------------------------------------------------
//---------------------------------------------------------------------------
+LC_TimeoutGuard::LC_TimeoutGuard (LiveCheck *owner, int token)
+ :owner_ (owner),
+ token_ (token),
+ blocked_ (owner->handle_timeout_busy_ == 0)
+{
+ if (!blocked_)
+ {
+ --owner_->handle_timeout_busy_;
+ }
+}
+
+LC_TimeoutGuard::~LC_TimeoutGuard (void)
+{
+ if (blocked_)
+ {
+ return;
+ }
+
+ ++owner_->handle_timeout_busy_;
+ if (owner_->want_timeout_)
+ {
+ ACE_Time_Value delay = ACE_Time_Value::zero;
+ if (owner_->deferred_timeout_ != ACE_Time_Value::zero)
+ {
+ ACE_Time_Value now (ACE_High_Res_Timer::gettimeofday_hr());
+ if (owner_->deferred_timeout_ > now)
+ delay = owner_->deferred_timeout_ - now;
+ }
+ ++owner_->token_;
+ if (ImR_Locator_i::debug () > 2)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) LC_TimeoutGuard(%d)::dtor,")
+ ACE_TEXT ("scheduling new timeout(%d), delay = %d,%d\n"),
+ this->token_, owner_->token_, delay.sec(), delay.usec()));
+ }
+ owner_->reactor()->schedule_timer (owner_,
+ reinterpret_cast<void *>(owner_->token_),
+ delay);
+ owner_->want_timeout_ = false;
+ }
+ else
+ {
+ if (ImR_Locator_i::debug () > 3)
+ {
+ ORBSVCS_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) LC_TimeoutGuard(%d)::dtor,")
+ ACE_TEXT ("no pending timeouts requested\n"),
+ this->token_));
+ }
+ }
+}
+
+bool LC_TimeoutGuard::blocked (void)
+{
+ return this->blocked_;
+}
+
+//---------------------------------------------------------------------------
+//---------------------------------------------------------------------------
+
LiveCheck::LiveCheck ()
:ping_interval_(),
running_ (false),
- token_ (100)
+ token_ (100),
+ handle_timeout_busy_ (1),
+ want_timeout_ (false),
+ deferred_timeout_ (0,0)
{
}
@@ -485,15 +571,17 @@ LiveCheck::handle_timeout (const ACE_Time_Value &,
if (!this->running_)
return -1;
- bool want_reping = false;
- ACE_Time_Value next;
+ LC_TimeoutGuard tg (this, static_cast<int>(token));
+ if (tg.blocked ())
+ return 0;
+
LiveEntryMap::iterator le_end = this->entry_map_.end();
for (LiveEntryMap::iterator le = this->entry_map_.begin();
le != le_end;
++le)
{
LiveEntry *entry = le->item ();
- if (entry->validate_ping (want_reping, next))
+ if (entry->validate_ping (this->want_timeout_, this->deferred_timeout_))
{
entry->do_ping (poa_.in ());
if (ImR_Locator_i::debug () > 2)
@@ -524,7 +612,7 @@ LiveCheck::handle_timeout (const ACE_Time_Value &,
LiveEntry *entry = *pe;
if (entry != 0)
{
- if (entry->validate_ping (want_reping, next))
+ if (entry->validate_ping (this->want_timeout_, this->deferred_timeout_))
{
entry->do_ping (poa_.in ());
}
@@ -536,22 +624,6 @@ LiveCheck::handle_timeout (const ACE_Time_Value &,
}
}
-
- if (want_reping)
- {
- ACE_Time_Value now (ACE_High_Res_Timer::gettimeofday_hr());
- ACE_Time_Value delay = next - now;
- ++this->token_;
- if (ImR_Locator_i::debug () > 2)
- {
- ORBSVCS_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) LiveCheck::handle_timeout(%d),")
- ACE_TEXT (" want reping(%d), delay = %d,%d\n"),
- token, this->token_, delay.sec(), delay.usec()));
- }
- this->reactor()->schedule_timer (this, reinterpret_cast<void *>(this->token_), delay);
- }
-
return 0;
}
@@ -614,10 +686,18 @@ LiveCheck::add_per_client_listener (LiveListener *l,
{
entry->add_listener (l);
- ++this->token_;
- this->reactor()->schedule_timer (this,
- reinterpret_cast<void *>(this->token_),
- ACE_Time_Value::zero);
+ if (this->handle_timeout_busy_ > 0)
+ {
+ ++this->token_;
+ this->reactor()->schedule_timer (this,
+ reinterpret_cast<void *>(this->token_),
+ ACE_Time_Value::zero);
+ }
+ else
+ {
+ this->want_timeout_ = true;
+ this->deferred_timeout_ = ACE_Time_Value::zero;
+ }
return true;
}
return false;
@@ -639,6 +719,7 @@ LiveCheck::add_poll_listener (LiveListener *l)
entry->add_listener (l);
entry->reset_status ();
+ l->status_changed (entry->status());
return this->schedule_ping (entry);
}
@@ -674,34 +755,38 @@ LiveCheck::schedule_ping (LiveEntry *entry)
ACE_Time_Value now (ACE_High_Res_Timer::gettimeofday_hr());
ACE_Time_Value next = entry->next_check ();
- ++this->token_;
- if (next <= now)
+
+ if (this->handle_timeout_busy_ > 0)
{
+ ACE_Time_Value delay = ACE_Time_Value::zero;
+ if (next > now)
+ {
+ delay = next - now;
+ }
+ ++this->token_;
if (ImR_Locator_i::debug () > 2)
{
ORBSVCS_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) LiveCheck::Schdedule_ping(%d),")
- ACE_TEXT (" immediate\n"),
- this->token_));
+ ACE_TEXT ("(%P|%t) LiveCheck::schedule_ping (%d),")
+ ACE_TEXT (" delay = %d,%d\n"),
+ this->token_, delay.sec(), delay.usec()));
}
-
this->reactor()->schedule_timer (this,
reinterpret_cast<void *>(this->token_),
- ACE_Time_Value::zero);
+ delay);
}
else
{
- ACE_Time_Value delay = next - now;
if (ImR_Locator_i::debug () > 2)
{
ORBSVCS_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) LiveCheck::Schdedule_ping(%d),")
- ACE_TEXT (" delay = %d,%d\n"),
- this->token_, delay.sec(), delay.usec()));
+ ACE_TEXT ("(%P|%t) LiveCheck::schedule_ping deferred")));
+ }
+ if (!this->want_timeout_ || next < this->deferred_timeout_)
+ {
+ this->want_timeout_ = true;
+ this->deferred_timeout_ = next;
}
- this->reactor()->schedule_timer (this,
- reinterpret_cast<void *>(this->token_),
- delay);
}
return true;
}
diff --git a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
index 005ae3a3005..5d30e55cd5f 100644
--- a/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
+++ b/TAO/orbsvcs/ImplRepo_Service/LiveCheck.h
@@ -172,7 +172,10 @@ class Locator_Export PingReceiver :
PingReceiver (LiveEntry * entry, PortableServer::POA_ptr poa);
virtual ~PingReceiver (void);
+ /// Called when an anticipated ping reply is received
void ping (void);
+
+ /// Called when an anticipated ping raises an exception
void ping_excep (Messaging::ExceptionHolder * excep_holder);
private:
@@ -180,6 +183,40 @@ class Locator_Export PingReceiver :
LiveEntry * entry_;
};
+
+//---------------------------------------------------------------------------
+/*
+ * @class LC_TimeoutGuard
+ *
+ * @brief A helper object to avoid reentrancy in the handle_timout method
+ *
+ * The LiveCheck::handle_timeout may be called reentrantly on a single thread
+ * if the sending of a ping uses non-blocking connection establishment. If a
+ * connection must be established before the ping can be sent, that may involve
+ * waiting in the reactor, possibly handing other requests, and possibly even
+ * subsequent timeouts.
+ * */
+
+class Locator_Export LC_TimeoutGuard
+{
+ public:
+ /// construct a new stack-based guard. This sets a flag in the owner that will
+ /// be cleared on destruction.
+ LC_TimeoutGuard (LiveCheck *owner, int token);
+
+ /// releases the flag. If the LiveCheck received any requests for an immediate
+ /// or defered ping during this time, schedule it now.
+ ~LC_TimeoutGuard (void);
+
+ /// Returns true if the busy flag in the owner was already set.
+ bool blocked (void);
+
+ private:
+ LiveCheck *owner_;
+ int token_;
+ bool blocked_;
+};
+
//---------------------------------------------------------------------------
/*
* @class LiveCheck
@@ -194,6 +231,8 @@ class Locator_Export PingReceiver :
class Locator_Export LiveCheck : public ACE_Event_Handler
{
public:
+ friend class LC_TimeoutGuard;
+
LiveCheck ();
~LiveCheck (void);
@@ -241,6 +280,9 @@ class Locator_Export LiveCheck : public ACE_Event_Handler
ACE_Time_Value ping_interval_;
bool running_;
int token_;
+ int handle_timeout_busy_;
+ bool want_timeout_;
+ ACE_Time_Value deferred_timeout_;
};
#endif /* IMR_LIVECHECK_H_ */