// // $Id$ #include "orbsvcs/Log_Macros.h" #include "ImR_Locator_i.h" #include "utils.h" #include "Iterator.h" #include "INS_Locator.h" #include "Locator_Repository.h" #include "Config_Backing_Store.h" #include "Shared_Backing_Store.h" #include "XML_Backing_Store.h" #include "orbsvcs/Time_Utilities.h" #include "tao/Messaging/Messaging.h" #include "tao/AnyTypeCode/Any.h" #include "ace/ARGV.h" #include "ace/OS_NS_sys_time.h" #include "ace/Vector_T.h" static const int DEFAULT_START_LIMIT = 1; static const ACE_Time_Value DEFAULT_SERVER_TIMEOUT (0, 10 * 1000); // 10ms /// We want to give shutdown a little more time to work, so that we /// can guarantee to the tao_imr utility that it has shutdown. The tao_imr /// utility prints a different message depending on whether shutdown succeeds /// or times out. static const ACE_Time_Value DEFAULT_SHUTDOWN_TIMEOUT (0, 5000 * 1000); static PortableServer::POA_ptr createPersistentPOA (PortableServer::POA_ptr root_poa, const char* poa_name) { PortableServer::LifespanPolicy_var life = root_poa->create_lifespan_policy (PortableServer::PERSISTENT); PortableServer::IdAssignmentPolicy_var assign = root_poa->create_id_assignment_policy (PortableServer::USER_ID); CORBA::PolicyList pols; pols.length (2); pols[0] = PortableServer::LifespanPolicy::_duplicate (life.in ()); pols[1] = PortableServer::IdAssignmentPolicy::_duplicate (assign.in ()); PortableServer::POAManager_var mgr = root_poa->the_POAManager (); PortableServer::POA_var poa = root_poa->create_POA (poa_name, mgr.in (), pols); life->destroy (); assign->destroy (); return poa._retn (); } int ImR_Locator_i::debug_ = 0; ImR_Locator_i::ImR_Locator_i (void) : dsi_forwarder_ (*this) , ins_locator_ (0) , aam_set_ () , read_only_ (false) , ping_external_ (false) , unregister_if_address_reused_ (false) { // Visual C++ 6.0 is not smart enough to do a direct assignment // while allocating the INS_Locator. So, we have to do it in // two steps. INS_Locator* locator; ACE_NEW (locator, INS_Locator (*this)); ins_locator_ = locator; } ImR_Locator_i::~ImR_Locator_i (void) { // For some reason g++ 4.0 needs this out-of-line destructor instead // of the default one generated by the compiler. Without this // destructor, we get a number of "undefined reference" link errors // related to the virtual tables of the INS_Locator, ImR_Adapter and // ImR_Forwarder members in this class. } int ImR_Locator_i::init_with_orb (CORBA::ORB_ptr orb, Options& opts) { this->orb_ = CORBA::ORB::_duplicate (orb); ImR_Locator_i::debug_ = opts.debug (); this->read_only_ = opts.readonly (); this->startup_timeout_ = opts.startup_timeout (); this->ping_external_ = opts.ping_external (); this->ping_interval_ = opts.ping_interval (); this->unregister_if_address_reused_ = opts.unregister_if_address_reused (); CORBA::Object_var obj = this->orb_->resolve_initial_references ("RootPOA"); this->root_poa_ = PortableServer::POA::_narrow (obj.in ()); ACE_ASSERT (! CORBA::is_nil (this->root_poa_.in ())); this->dsi_forwarder_.init (orb); this->adapter_.init (& this->dsi_forwarder_); this->pinger_.init (orb, this->ping_interval_); // Register the Adapter_Activator reference to be the RootPOA's // Adapter Activator. root_poa_->the_activator (&this->adapter_); // Use a persistent POA so that any IOR this->imr_poa_ = createPersistentPOA (this->root_poa_.in (), "ImplRepo_Service"); ACE_ASSERT (! CORBA::is_nil (this->imr_poa_.in ())); PortableServer::ObjectId_var id = PortableServer::string_to_ObjectId ("ImplRepo_Service"); this->imr_poa_->activate_object_with_id (id.in (), this); obj = this->imr_poa_->id_to_reference (id.in ()); ImplementationRepository::Locator_var locator = ImplementationRepository::Locator::_narrow (obj.in ()); ACE_ASSERT(! CORBA::is_nil (locator.in ())); const CORBA::String_var ior = this->orb_->object_to_string (obj.in ()); // create the selected Locator_Repository with backing store switch (opts.repository_mode()) { case Options::REPO_REGISTRY: { repository_.reset(new Registry_Backing_Store(opts, orb)); break; } case Options::REPO_HEAP_FILE: { repository_.reset(new Heap_Backing_Store(opts, orb)); break; } case Options::REPO_XML_FILE: { repository_.reset(new XML_Backing_Store(opts, orb)); break; } case Options::REPO_SHARED_FILES: { repository_.reset(new Shared_Backing_Store(opts, orb)); break; } case Options::REPO_NONE: { repository_.reset(new No_Backing_Store(opts, orb)); break; } default: { bool invalid_rmode_specified = false; ACE_ASSERT (invalid_rmode_specified); ACE_UNUSED_ARG (invalid_rmode_specified); ORBSVCS_ERROR_RETURN (( LM_ERROR, ACE_TEXT ("Repository failed to initialize\n")), -1); } } // Register the ImR for use with INS obj = orb->resolve_initial_references ("AsyncIORTable"); IORTable::Table_var ior_table = IORTable::Table::_narrow (obj.in ()); ACE_ASSERT (! CORBA::is_nil (ior_table.in ())); ior_table->set_locator (this->ins_locator_.in ()); // initialize the repository. This will load any values that // may have been persisted before. int result = this->repository_->init(this->root_poa_.in (), this->imr_poa_.in (), ior); if (result == 0) { Locator_Repository::SIMap::ENTRY* entry = 0; Locator_Repository::SIMap::ITERATOR it (this->repository_->servers ()); for (;it.next (entry) != 0; it.advance ()) { const Server_Info& info = *(entry->int_id_); // ImplementationRepository::ServerInformation_var imr_info = // info.createImRServerInfo (); if (!CORBA::is_nil (info.server.in()) && !this->pinger_.has_server (info.name.c_str())) { this->pinger_.add_server (info.name.c_str(), this->ping_external_, info.server.in()); } } } return result; } int ImR_Locator_i::init (Options& opts) { ACE_CString cmdline = opts.cmdline (); cmdline += " -orbuseimr 0"; ACE_ARGV av (cmdline.c_str ()); int argc = av.argc (); ACE_TCHAR** argv = av.argv (); CORBA::ORB_var orb = CORBA::ORB_init (argc, argv, "TAO_ImR_Locator"); int err = this->init_with_orb (orb.in (), opts); return err; } int ImR_Locator_i::run (void) { if (debug_ > 0) { // This debug message was split into two calls to // work around yet another bug in Visual Studio 2005. // When this was a single debug message, the value for // debug () came out garbled and the read-only string // caused an ACCESS VIOLATION -- Chad Elliott 10/4/2006 ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("Implementation Repository: Running\n") ACE_TEXT ("\tPing Interval : %dms\n") ACE_TEXT ("\tStartup Timeout : %ds\n") ACE_TEXT ("\tPersistence : %s\n") ACE_TEXT ("\tMulticast : %C\n"), ping_interval_.msec (), startup_timeout_.sec (), this->repository_->repo_mode (), (this->repository_->multicast () != 0 ? "Enabled" : "Disabled"))); ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("\tDebug : %d\n") ACE_TEXT ("\tLocked : %C\n\n"), debug (), (read_only_ ? "True" : "False"))); } this->auto_start_servers (); this->orb_->run (); return 0; } void ImR_Locator_i::shutdown (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, CORBA::Boolean activators, CORBA::Boolean servers) { this->pinger_.shutdown (); this->aam_set_.reset (); if (servers != 0 && this->repository_->servers ().current_size () > 0) { // Note : shutdown is oneway, so we can't throw ORBSVCS_ERROR (( LM_ERROR, ACE_TEXT ("ImR: Shutdown of all servers not implemented.\n"))); } if (activators != 0 && this->repository_->activators ().current_size () > 0) { ACE_Vector acts; Locator_Repository::AIMap::ENTRY* entry = 0; Locator_Repository::AIMap::ITERATOR it (this->repository_->activators ()); for (;it.next (entry) != 0; it.advance ()) { Activator_Info_Ptr info = entry->int_id_; ACE_ASSERT (! info.null ()); connect_activator (*info); if (! CORBA::is_nil (info->activator.in ())) acts.push_back (info->activator); } int shutdown_errs = 0; for (size_t i = 0; i < acts.size (); ++i) { try { acts[i]->shutdown (); acts[i] = ImplementationRepository::Activator::_nil (); } catch (CORBA::Exception& ex) { ++shutdown_errs; if (debug_ > 1) { ex._tao_print_exception ( ACE_TEXT ("ImR: shutdown activator")); } } } if (debug_ > 0 && shutdown_errs > 0) { ORBSVCS_DEBUG (( LM_DEBUG, ACE_TEXT ("ImR: Some activators could not be shut down.\n"))); } } // Technically, we should wait for all the activators to unregister, but // ,for now at least, it doesn't seem worth it. this->shutdown (false); _tao_rh->shutdown (); } void ImR_Locator_i::shutdown (bool wait_for_completion) { this->orb_->shutdown (wait_for_completion); } int ImR_Locator_i::fini (void) { try { if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Shutting down...\n"))); this->root_poa_->destroy (1, 1); this->orb_->destroy (); if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Shut down successfully.\n"))); } catch (CORBA::Exception& ex) { ex._tao_print_exception (ACE_TEXT ("ImR_Locator_i::fini")); throw; } return 0; } void ImR_Locator_i::register_activator (ImplementationRepository::AMH_LocatorResponseHandler_ptr _tao_rh, const char* aname, ImplementationRepository::Activator_ptr activator) { ACE_ASSERT (aname != 0); ACE_ASSERT (! CORBA::is_nil (activator)); // Before we can register the activator, we need to ensure that any existing // registration is purged. this->unregister_activator_i (aname); CORBA::String_var ior = this->orb_->object_to_string (activator); CORBA::Long token = ACE_OS::gettimeofday ().msec (); int err = this->repository_->add_activator (aname, token, ior.in (), activator); ACE_ASSERT (err == 0); ACE_UNUSED_ARG (err); if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Activator registered for %C.\n"), aname)); _tao_rh->register_activator (token); } void ImR_Locator_i::unregister_activator (ImplementationRepository::AMH_LocatorResponseHandler_ptr _tao_rh, const char* aname, CORBA::Long token) { ACE_ASSERT (aname != 0); Activator_Info_Ptr info = this->get_activator (aname); if (! info.null ()) { if (info->token != token && debug_ > 0) { ORBSVCS_DEBUG (( LM_DEBUG, ACE_TEXT ("ImR: Ignoring unregister activator:%C. Wrong token.\n"), aname)); _tao_rh->unregister_activator (); return; } this->unregister_activator_i (aname); if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Activator %C unregistered.\n"), aname)); } else { if (debug_ > 0) ORBSVCS_DEBUG (( LM_DEBUG, ACE_TEXT ("ImR: Ignoring unregister activator: %C. ") ACE_TEXT ("Unknown activator.\n"), aname)); } _tao_rh->unregister_activator (); } void ImR_Locator_i::unregister_activator_i (const char* aname) { ACE_ASSERT (aname != 0); int err = this->repository_->remove_activator (aname); ACE_UNUSED_ARG (err); } void ImR_Locator_i::notify_child_death (ImplementationRepository::AMH_LocatorResponseHandler_ptr _tao_rh, const char* name) { ACE_ASSERT (name != 0); if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server has died <%C>.\n"), name)); UpdateableServerInfo info(this->repository_.get(), name); if (! info.null ()) { info.edit()->ior = ""; info.edit()->partial_ior = ""; } else { if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Failed to find server in repository.\n"))); } AsyncAccessManager_ptr aam(this->find_aam (name)); if (!aam.is_nil()) { aam->notify_child_death (); } _tao_rh->notify_child_death (); } char* ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start) { ImR_SyncResponseHandler rh ("", this->orb_.in()); this->activate_server_by_name (name, manual_start, &rh); return rh.wait_for_result (); } char* ImR_Locator_i::activate_server_by_object (const char* object_name) { Server_Info_Ptr si; ACE_CString key; ACE_CString full (object_name); if (this->split_key (full, key, si)) { ImR_SyncResponseHandler rh (key.c_str(), this->orb_.in()); this->activate_server_by_info (si, &rh); return rh.wait_for_result (); } throw ImplementationRepository::NotFound(); } void ImR_Locator_i::activate_server (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* server) { if (debug_ > 1) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Manually activating server <%C>\n"), server)); } ImR_ResponseHandler *rh = 0; ACE_NEW (rh, ImR_Loc_ResponseHandler (ImR_Loc_ResponseHandler::LOC_ACTIVATE_SERVER, _tao_rh)); // This is the version called by tao_imr to activate the server, manually // starting it if necessary. activate_server_by_name (server, true, rh); } bool ImR_Locator_i::get_info_for_name (const char* name, Server_Info_Ptr &si) { ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; this->parse_id (name, server_id, serverKey, jacorb_server); si = this->repository_->get_server (serverKey); return !si.null(); } bool ImR_Locator_i::split_key (ACE_CString &full, ACE_CString &key, Server_Info_Ptr &si) { key = full; if (this->get_info_for_name (full.c_str(), si)) { return true; } ACE_CString::size_type pos = full.rfind ('/'); while (pos != ACE_CString::npos) { ACE_CString server = full.substring (0, pos); if (this->get_info_for_name (server.c_str (), si)) { return true; } pos = server.rfind ('/'); } return false; } void ImR_Locator_i::activate_server_by_name (const char* name, bool manual_start, ImR_ResponseHandler *rh) { // Activate the server, starting it if necessary. Don't start MANUAL // servers unless manual_start=true ACE_ASSERT (name != 0); Server_Info_Ptr si; if (!this->get_info_for_name(name, si)) { rh->send_exception ( new ImplementationRepository::NotFound ); return; } UpdateableServerInfo info (this->repository_.get(), si, true); this->activate_server_i (info, manual_start, rh); } void ImR_Locator_i::activate_server_by_info (const Server_Info_Ptr &si, ImR_ResponseHandler *rh) { UpdateableServerInfo info (this->repository_.get(), si, true); this->activate_server_i (info, false, rh); } void ImR_Locator_i::activate_server_i (UpdateableServerInfo& info, bool manual_start, ImR_ResponseHandler *rh) { AsyncAccessManager_ptr aam; if (info->activation_mode == ImplementationRepository::PER_CLIENT) { AsyncAccessManager *aam_raw; ACE_NEW (aam_raw, AsyncAccessManager (*info, manual_start, *this)); aam = aam_raw; this->aam_set_.insert_tail (aam); } else { aam = this->find_aam (info->name.c_str()); if (aam.is_nil()) { AsyncAccessManager *aam_raw; ACE_NEW (aam_raw, AsyncAccessManager (*info, manual_start, *this)); aam = aam_raw; this->aam_set_.insert_tail (aam); } } aam->add_interest (rh); } CORBA::Object_ptr ImR_Locator_i::set_timeout_policy (CORBA::Object_ptr obj, const ACE_Time_Value& to) { CORBA::Object_var ret (CORBA::Object::_duplicate (obj)); try { TimeBase::TimeT timeout; ORBSVCS_Time::Time_Value_to_TimeT (timeout, to); CORBA::Any tmp; tmp <<= timeout; CORBA::PolicyList policies (1); policies.length (1); policies[0] = orb_->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, tmp); ret = obj->_set_policy_overrides (policies, CORBA::ADD_OVERRIDE); policies[0]->destroy (); if (CORBA::is_nil (ret.in ())) { if (debug_ > 0) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Unable to set timeout policy.\n"))); } ret = CORBA::Object::_duplicate (obj); } } catch (CORBA::Exception& ex) { ex._tao_print_exception ( ACE_TEXT ("ImR_Locator_i::set_timeout_policy ()")); } return ret._retn (); } void ImR_Locator_i::add_or_update_server (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* server, const ImplementationRepository::StartupOptions &options) { ACE_ASSERT (server != 0); if (this->read_only_) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Cannot add/update server <%C> due to locked ") ACE_TEXT ("database.\n"), server)); CORBA::Exception *ex = new CORBA::NO_PERMISSION (CORBA::SystemException::_tao_minor_code (TAO_IMPLREPO_MINOR_CODE,0), CORBA::COMPLETED_NO); ImplementationRepository::AMH_AdministrationExceptionHolder h (ex); _tao_rh->add_or_update_server_excep (&h); return; } if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Add/Update server <%C>.\n"), server)); int limit = options.start_limit; if (limit < 0) { limit = -limit; } else if (limit == 0) { limit = 1; } ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; this->parse_id (server, server_id, serverKey, jacorb_server); UpdateableServerInfo info(this->repository_.get(), serverKey); if (info.null ()) { if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Adding server <%C>.\n"), server)); this->repository_->add_server ("", serverKey, jacorb_server, options.activator.in (), options.command_line.in (), options.environment, options.working_directory.in (), options.activation, limit); } else { if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Updating server <%C>.\n"), server)); info.edit ()->activator = options.activator.in (); info.edit ()->cmdline = options.command_line.in (); info.edit ()->env_vars = options.environment; info.edit ()->dir = options.working_directory.in (); info.edit ()->activation_mode = options.activation; info.edit ()->start_limit = limit; info.edit ()->start_count = 0; info.update_repo(); } if (debug_ > 1) { // Note : The info var may be null, so we use options. ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server: %C\n") ACE_TEXT ("\tActivator: %C\n") ACE_TEXT ("\tCommand Line: %C\n") ACE_TEXT ("\tWorking Directory: %C\n") ACE_TEXT ("\tActivation: %C\n") ACE_TEXT ("\tStart Limit: %d\n") ACE_TEXT ("\n"), server, options.activator.in (), options.command_line.in (), options.working_directory.in (), ImR_Utils::activationModeToString (options.activation).c_str (), limit )); for (CORBA::ULong i = 0; i < options.environment.length (); ++i) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("Environment variable %C=%C\n"), options.environment[i].name.in (), options.environment[i].value.in ())); } _tao_rh->add_or_update_server (); } void ImR_Locator_i::parse_id (const char* id, ACE_CString& server_id, ACE_CString& name, bool& jacorb_server) { const char *pos = ACE_OS::strchr (id, ':'); if (pos) { ACE_CString idstr (id); server_id = idstr.substr (0, pos - id); name = idstr.substr (pos - id + 1); if (server_id == "JACORB") { jacorb_server = true; ssize_t idx = name.find("/"); server_id = name.substr(0, idx); } } else { name = id; } } void ImR_Locator_i::remove_server (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* name) { ACE_ASSERT (name != 0); if (this->read_only_) { ORBSVCS_ERROR ((LM_ERROR, ACE_TEXT ("ImR: Can't remove server <%C> due to locked database.\n"), name)); CORBA::Exception *ex = new CORBA::NO_PERMISSION (CORBA::SystemException::_tao_minor_code (TAO_IMPLREPO_MINOR_CODE, 0), CORBA::COMPLETED_NO); ImplementationRepository::AMH_AdministrationExceptionHolder h (ex); _tao_rh->remove_server_excep (&h); return; } // Note : This will be safe, because any Server_Info_Ptr objects will still // be valid, and the actual Server_Info will be destroyed when the last // one goes out of scope. ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; this->parse_id(name, server_id, serverKey, jacorb_server); Server_Info_Ptr info = this->repository_->get_server (serverKey); if (! info.null ()) { if (this->repository_->remove_server (serverKey) == 0) { if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Removing Server <%C>...\n"), name)); PortableServer::POA_var poa = findPOA (serverKey.c_str()); if (! CORBA::is_nil (poa.in ())) { bool etherealize = true; bool wait = false; poa->destroy (etherealize, wait); } if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Removed Server <%C>.\n"), name)); } } else { ORBSVCS_ERROR ((LM_ERROR, ACE_TEXT ("ImR: Can't remove unknown server <%C>.\n"), name)); CORBA::Exception *ex = new ImplementationRepository::NotFound; ImplementationRepository::AMH_AdministrationExceptionHolder h (ex); _tao_rh->remove_server_excep (&h); return; } _tao_rh->remove_server (); } PortableServer::POA_ptr ImR_Locator_i::findPOA (const char* name) { try { bool activate_it = false; return root_poa_->find_POA (name, activate_it); } catch (CORBA::Exception&) {// Ignore } return PortableServer::POA::_nil (); } void ImR_Locator_i::shutdown_server (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* server) { ACE_ASSERT (server != 0); if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Shutting down server <%C>.\n"), server)); ACE_CString name; ACE_CString server_id; bool jacorb_server = false; this->parse_id(server, server_id, name, jacorb_server); UpdateableServerInfo info(this->repository_.get(), name); if (info.null ()) { ORBSVCS_ERROR ((LM_ERROR, ACE_TEXT ("ImR: shutdown_server () Cannot find info for server <%C>\n"), server)); CORBA::Exception *ex = new ImplementationRepository::NotFound; ImplementationRepository::AMH_AdministrationExceptionHolder h (ex); _tao_rh->shutdown_server_excep (&h); return; } this->connect_server (info); if (CORBA::is_nil (info->server.in ())) { ORBSVCS_ERROR ((LM_ERROR, ACE_TEXT ("ImR: shutdown_server () Cannot connect to server <%C>\n"), server)); CORBA::Exception *ex = new ImplementationRepository::NotFound; ImplementationRepository::AMH_AdministrationExceptionHolder h (ex); try { _tao_rh->shutdown_server_excep (&h); } catch (CORBA::Exception &ex) { ex._tao_print_exception (ACE_TEXT ("reporting connect error\n")); } return; } try { CORBA::Object_var obj = this->set_timeout_policy (info->server.in (), DEFAULT_SHUTDOWN_TIMEOUT); ImplementationRepository::ServerObject_var server = ImplementationRepository::ServerObject::_unchecked_narrow (obj.in ()); server->shutdown (); } catch (CORBA::TIMEOUT &to_ex) { info.edit ()->reset (); // Note : This is a good thing. It means we didn't waste our time waiting for // the server to finish shutting down. if (debug_ > 1) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Timeout while waiting for <%C> shutdown.\n"), server)); } ImplementationRepository::AMH_AdministrationExceptionHolder h (to_ex._tao_duplicate()); _tao_rh->shutdown_server_excep (&h); return; } catch (CORBA::Exception&) { if (debug_ > 1) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Exception ignored while shutting down <%C>\n"), server)); } } // Note : In most cases this has already been done in the server_is_shutting_down () // operation, but it doesn't hurt to update it again. info.edit ()->reset (); _tao_rh->shutdown_server (); } void ImR_Locator_i::server_is_running (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* id, const char* partial_ior, ImplementationRepository::ServerObject_ptr server) { ACE_ASSERT (id != 0); ACE_ASSERT (partial_ior != 0); ACE_ASSERT (! CORBA::is_nil (server)); ACE_CString server_id; ACE_CString name; bool jacorb_server = false; this->parse_id(id, server_id, name, jacorb_server); if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server %C is running at %C.\n"), name.c_str (), partial_ior)); CORBA::String_var ior = orb_->object_to_string (server); if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server %C callback at %C.\n"), name.c_str (), ior.in ())); if (this->unregister_if_address_reused_) this->repository_->unregister_if_address_reused (server_id, name, partial_ior, this); CORBA::Object_var obj = this->set_timeout_policy (server,ACE_Time_Value (1,0)); ImplementationRepository::ServerObject_var s = ImplementationRepository::ServerObject::_narrow (obj.in()); UpdateableServerInfo info(this->repository_.get(), name); if (info.null ()) { if (debug_ > 0) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Auto adding NORMAL server <%C>.\n"), name.c_str ())); } ImplementationRepository::EnvironmentList env (0); this->repository_->add_server (server_id, name, jacorb_server, "", // no activator "", // no cmdline ImplementationRepository::EnvironmentList (), "", // no working dir ImplementationRepository::NORMAL, DEFAULT_START_LIMIT, partial_ior, ior.in (), s.in () ); Server_Info_Ptr temp_info = this->repository_->get_server(name); if (temp_info.null ()) { if (debug_ > 0) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Auto adding failed, giving up <%C>\n"), name.c_str ())); } _tao_rh->server_is_running (); return; } this->pinger_.add_server (name.c_str(), this->ping_external_, s.in()); AsyncAccessManager *aam_raw; ACE_NEW (aam_raw, AsyncAccessManager (*temp_info, true, *this)); AsyncAccessManager_ptr aam (aam_raw); aam->started_running (); this->aam_set_.insert (aam); } else { if (info->server_id != server_id) { if (! info->server_id.empty()) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR - WARNING: server \"%C\" changed server id from ") ACE_TEXT ("\"%C\" to \"%C\" waiting PER_CLIENT clients.\n"), name.c_str (), info->server_id.c_str (), server_id.c_str ())); info.edit ()->server_id = server_id; } if (info->activation_mode != ImplementationRepository::PER_CLIENT) { info.edit ()->ior = ior.in (); info.edit ()->partial_ior = partial_ior; info.edit ()->server = s; info.update_repo(); this->pinger_.add_server (name.c_str(), true, s.in()); } AsyncAccessManager_ptr aam(this->find_aam (name.c_str())); if (!aam.is_nil()) aam->server_is_running (partial_ior, s.in()); else { if (info->activation_mode != ImplementationRepository::PER_CLIENT) { AsyncAccessManager *aam_raw; ACE_NEW (aam_raw, AsyncAccessManager (*info, true, *this)); AsyncAccessManager_ptr aam (aam_raw); aam->started_running (); this->aam_set_.insert (aam); } } } _tao_rh->server_is_running (); } void ImR_Locator_i::server_is_shutting_down (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* server) { ACE_ASSERT (server != 0); UpdateableServerInfo info (this->repository_.get(), server); if (info.null ()) { if (debug_ > 1) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR_Locator_i::server_is_shutting_down: ") ACE_TEXT ("Unknown server:%C\n"), server)); } _tao_rh->server_is_shutting_down (); return; } if (debug_ > 0) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Server <%C> is shutting down.\n"), server)); if (info->activation_mode != ImplementationRepository::PER_CLIENT) { this->pinger_.remove_server (server); { AsyncAccessManager_ptr aam = this->find_aam (server); if (!aam.is_nil()) { aam->server_is_shutting_down (); } } } info.edit ()->reset (); _tao_rh->server_is_shutting_down (); } void ImR_Locator_i::find (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, const char* server) { ACE_ASSERT (server != 0); ACE_CString serverKey; ACE_CString server_id; bool jacorb_server = false; this->parse_id(server, server_id, serverKey, jacorb_server); UpdateableServerInfo info(this->repository_.get(), serverKey); ImplementationRepository::ServerInformation_var imr_info; try { if (! info.null ()) { imr_info = info->createImRServerInfo (); if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Found server %C.\n"), server)); } else { ACE_NEW_THROW_EX (imr_info, ImplementationRepository::ServerInformation, CORBA::NO_MEMORY ()); imr_info->startup.activation= ImplementationRepository::NORMAL; if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Cannot find server <%C>\n"), server)); } } catch (CORBA::Exception &ex) { ImplementationRepository::AMH_AdministrationExceptionHolder h (ex._tao_duplicate()); _tao_rh->find_excep (&h); return; } _tao_rh->find (imr_info.in()); } void ImR_Locator_i::list (ImplementationRepository::AMH_AdministrationResponseHandler_ptr _tao_rh, CORBA::ULong how_many, CORBA::Boolean active) { AsyncListManager *l = 0; try { ACE_NEW_THROW_EX (l, AsyncListManager (this->repository_.get(), this->imr_poa_.in(), active ? &this->pinger_ : 0), CORBA::NO_MEMORY ()); AsyncListManager_ptr lister (l); l->list (_tao_rh, how_many); } catch (CORBA::Exception &ex) { ImplementationRepository::AMH_AdministrationExceptionHolder h (ex._tao_duplicate()); _tao_rh->find_excep (&h); return; } } Activator_Info_Ptr ImR_Locator_i::get_activator (const ACE_CString& aname) { Activator_Info_Ptr info = this->repository_->get_activator (aname); if (! info.null ()) { this->connect_activator (*info); } return info; } void ImR_Locator_i::connect_activator (Activator_Info& info) { if (! CORBA::is_nil (info.activator.in ()) || info.ior.length () == 0) return; try { CORBA::Object_var obj = this->orb_->string_to_object (info.ior.c_str ()); if (CORBA::is_nil (obj.in ())) { info.reset (); return; } if (startup_timeout_ > ACE_Time_Value::zero) { obj = this->set_timeout_policy (obj.in (), startup_timeout_); } info.activator = ImplementationRepository::Activator::_unchecked_narrow (obj.in ()); if (CORBA::is_nil (info.activator.in ())) { info.reset (); return; } if (debug_ > 1) ORBSVCS_DEBUG (( LM_DEBUG, ACE_TEXT ("ImR: Connected to activator <%C>\n"), info.name.c_str ())); } catch (CORBA::Exception&) { info.reset (); } } void ImR_Locator_i::auto_start_servers (void) { if (this->repository_->servers ().current_size () == 0) return; Locator_Repository::SIMap::ENTRY* server_entry; Locator_Repository::SIMap::ITERATOR server_iter (this->repository_->servers ()); // For each of the entries in the Locator_Repository, get the startup // information and activate the servers, if they are not already // running. for (;server_iter.next (server_entry) != 0; server_iter.advance ()) { UpdateableServerInfo info(this->repository_.get(), server_entry->int_id_); ACE_ASSERT (! info.null ()); try { if (info->activation_mode == ImplementationRepository::AUTO_START && info->cmdline.length () > 0) { ImR_ResponseHandler rh; this->activate_server_i (info, true, &rh); } } catch (CORBA::Exception& ex) { if (debug_ > 1) { ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: AUTO_START Could not activate <%C>\n"), server_entry->ext_id_.c_str ())); ex._tao_print_exception ("AUTO_START"); } // Ignore exceptions } } } void ImR_Locator_i::connect_server (UpdateableServerInfo& info) { if (! CORBA::is_nil (info->server.in ())) { if (!this->pinger_.has_server (info->name.c_str())) { this->pinger_.add_server (info->name.c_str(), this->ping_external_, info->server.in()); } return; // already connected } if (info->ior.length () == 0) { info.edit ()->reset (); return; // can't connect } try { CORBA::Object_var obj = orb_->string_to_object (info->ior.c_str ()); if (CORBA::is_nil (obj.in ())) { info.edit ()->reset (); return; } obj = this->set_timeout_policy (obj.in (), DEFAULT_SERVER_TIMEOUT); info.edit ()->server = ImplementationRepository::ServerObject::_unchecked_narrow (obj.in ()); if (CORBA::is_nil (info->server.in ())) { info.edit ()->reset (); return; } if (debug_ > 1) ORBSVCS_DEBUG ((LM_DEBUG, ACE_TEXT ("ImR: Connected to server <%C>\n"), info->name.c_str ())); this->pinger_.add_server (info->name.c_str(), this->ping_external_, info->server.in()); } catch (CORBA::Exception&) { info.edit ()->reset (); } } bool ImR_Locator_i::is_alive (UpdateableServerInfo& info) { this->connect_server (info); SyncListener *listener = 0; ACE_NEW_RETURN (listener, SyncListener (info->name.c_str(), this->orb_.in(), this->pinger_), false); LiveListener_ptr llp(listener); return listener->is_alive(); } int ImR_Locator_i::debug () { return debug_; } LiveCheck& ImR_Locator_i::pinger (void) { return this->pinger_; } PortableServer::POA_ptr ImR_Locator_i::root_poa (void) { return PortableServer::POA::_duplicate (this->root_poa_.in()); } void ImR_Locator_i::remove_aam (AsyncAccessManager_ptr &aam) { this->aam_set_.remove (aam); } void ImR_Locator_i::remove_aam (const char *name) { for (AAM_Set::ITERATOR i = this->aam_set_.begin(); i != this->aam_set_.end(); ++i) { if ((*i)->has_server (name)) { this->aam_set_.remove (*i); return; } } } AsyncAccessManager * ImR_Locator_i::find_aam (const char *name) { for (AAM_Set::ITERATOR i = this->aam_set_.begin(); i != this->aam_set_.end(); ++i) { if ((*i)->has_server (name)) { return (*i)->_add_ref(); } } return 0; } //------------------------------------------------------------------------- SyncListener::SyncListener (const char *server, CORBA::ORB_ptr orb, LiveCheck &pinger) :LiveListener (server), orb_ (CORBA::ORB::_duplicate (orb)), pinger_ (pinger), status_ (LS_UNKNOWN), got_it_ (false), callback_ (false) { } SyncListener::~SyncListener () { } bool SyncListener::is_alive (void) { this->callback_ = true; while (!this->got_it_) { if (this->callback_) { if (!this->pinger_.add_poll_listener (this)) { return false; } } this->callback_ = false; ACE_Time_Value delay (10,0); this->orb_->perform_work (delay); } this->got_it_ = false; return this->status_ != LS_DEAD; } bool SyncListener::status_changed (LiveStatus status) { this->callback_ = true; this->status_ = status; this->got_it_ = (status != LS_TRANSIENT); return true; } //--------------------------------------------------------------------------- ImR_SyncResponseHandler::ImR_SyncResponseHandler (const char *objkey, CORBA::ORB_ptr orb) :excep_ (0), key_ (objkey), orb_ (CORBA::ORB::_duplicate (orb)) { } ImR_SyncResponseHandler::~ImR_SyncResponseHandler (void) { } void ImR_SyncResponseHandler::send_ior (const char *pior) { ACE_CString full (pior); full += this->key_; this->result_ = full.c_str(); } void ImR_SyncResponseHandler::send_exception (CORBA::Exception *ex) { this->excep_ = ex->_tao_duplicate(); } char * ImR_SyncResponseHandler::wait_for_result (void) { while (this->result_.in() == 0 && this->excep_ == 0) { this->orb_->perform_work (); } if (this->excep_ != 0) { TAO_AMH_DSI_Exception_Holder h(this->excep_); h.raise_invoke (); } return this->result_._retn(); } //--------------------------------------------------------------------------- ImR_Loc_ResponseHandler::ImR_Loc_ResponseHandler (Loc_Operation_Id opid, ImplementationRepository::AMH_AdministrationResponseHandler_ptr rh) :op_id_ (opid), resp_ (ImplementationRepository::AMH_AdministrationResponseHandler::_duplicate (rh)) { } ImR_Loc_ResponseHandler::~ImR_Loc_ResponseHandler (void) { } void ImR_Loc_ResponseHandler::send_ior (const char *) { switch (this->op_id_) { case LOC_ACTIVATE_SERVER: resp_->activate_server (); break; case LOC_ADD_OR_UPDATE_SERVER: resp_->add_or_update_server (); break; case LOC_REMOVE_SERVER: resp_->remove_server (); break; case LOC_SHUTDOWN_SERVER: resp_->shutdown_server (); break; case LOC_SERVER_IS_RUNNING: resp_->server_is_running (); break; case LOC_SERVER_IS_SHUTTING_DOWN: resp_->server_is_shutting_down (); break; }; delete this; } void ImR_Loc_ResponseHandler::send_exception (CORBA::Exception *ex) { ImplementationRepository::AMH_AdministrationExceptionHolder h (ex); switch (this->op_id_) { case LOC_ACTIVATE_SERVER: resp_->activate_server_excep (&h); break; case LOC_ADD_OR_UPDATE_SERVER: resp_->add_or_update_server_excep (&h); break; case LOC_REMOVE_SERVER: resp_->remove_server_excep (&h); break; case LOC_SHUTDOWN_SERVER: resp_->shutdown_server_excep (&h); break; case LOC_SERVER_IS_RUNNING: resp_->server_is_running_excep (&h); break; case LOC_SERVER_IS_SHUTTING_DOWN: resp_->server_is_shutting_down_excep (&h); break; }; delete this; }