diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_Gateway.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_Gateway.cpp | 705 |
1 files changed, 705 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_Gateway.cpp new file mode 100644 index 00000000000..723466ea888 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_Mcast_Gateway.cpp @@ -0,0 +1,705 @@ +// $Id$ + +#include "orbsvcs/Event/ECG_Mcast_Gateway.h" + +#include "orbsvcs/Event/EC_Lifetime_Utils_T.h" +#include "orbsvcs/Event/ECG_Simple_Address_Server.h" +#include "orbsvcs/Event/ECG_Complex_Address_Server.h" +#include "orbsvcs/Event/ECG_Simple_Mcast_EH.h" +#include "orbsvcs/Event/ECG_Mcast_EH.h" +#include "orbsvcs/Event/ECG_UDP_EH.h" + +#include "orbsvcs/Event_Utilities.h" + +#include "ace/Dynamic_Service.h" +#include "ace/Arg_Shifter.h" +#include "tao/ORB_Core.h" +#include "ace/OS_NS_strings.h" + +#if ! defined (__ACE_INLINE__) +#include "orbsvcs/Event/ECG_Mcast_Gateway.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, ECG_Mcast_Gateway, "$Id$") + + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> > +UDP_Sender_Shutdown; + +typedef TAO_EC_Shutdown_Command<TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> > +UDP_Receiver_Shutdown; + + +int +TAO_ECG_Mcast_Gateway::init_svcs (void) +{ + return ACE_Service_Config::static_svcs ()-> + insert (&ace_svc_desc_TAO_ECG_Mcast_Gateway); +} + + +int +TAO_ECG_Mcast_Gateway::fini (void) +{ + return 0; +} + +int +TAO_ECG_Mcast_Gateway::init (int argc, ACE_TCHAR* argv[]) +{ + int result = 0; + + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + const ACE_TCHAR *arg = arg_shifter.get_current (); + + if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGService")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + const ACE_TCHAR* opt = arg_shifter.get_current (); + if (ACE_OS::strcasecmp (opt, ACE_TEXT ("receiver")) == 0) + this->service_type_ = ECG_MCAST_RECEIVER; + else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("sender")) == 0) + this->service_type_ = ECG_MCAST_SENDER; + else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("two_way")) == 0) + this->service_type_ = ECG_MCAST_TWO_WAY; + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unsupported <-ECGService> option ") + ACE_TEXT ("value: <%s>. Ignoring this option ") + ACE_TEXT ("- using defaults instead.\n"), + opt)); + result = -1; + } + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServer")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + const ACE_TCHAR* opt = arg_shifter.get_current (); + if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0) + this->address_server_type_ = ECG_ADDRESS_SERVER_BASIC; + else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("source")) == 0) + this->address_server_type_ = ECG_ADDRESS_SERVER_SOURCE; + else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("type")) == 0) + this->address_server_type_ = ECG_ADDRESS_SERVER_TYPE; + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unsupported <-ECGAddressServer> ") + ACE_TEXT ("option value: <%s>. Ignoring this ") + ACE_TEXT ("option - using defaults instead.\n"), + opt)); + result = -1; + } + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGAddressServerArg")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + this->address_server_arg_.set (arg_shifter.get_current ()); + arg_shifter.consume_arg (); + } + } + + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGHandler")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + const ACE_TCHAR* opt = arg_shifter.get_current (); + if (ACE_OS::strcasecmp (opt, ACE_TEXT ("basic")) == 0) + this->handler_type_ = ECG_HANDLER_BASIC; + else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("complex")) == 0) + this->handler_type_ = ECG_HANDLER_COMPLEX; + else if (ACE_OS::strcasecmp (opt, ACE_TEXT ("udp")) == 0) + this->handler_type_ = ECG_HANDLER_UDP; + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("Unsupported <-ECGHandler> ") + ACE_TEXT ("option value: <%s>. Ignoring this ") + ACE_TEXT ("option - using defaults instead.\n"), + opt)); + result = -1; + } + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGTTL")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + const ACE_TCHAR* opt = arg_shifter.get_current (); + unsigned long tmp = ACE_OS::strtoul (opt, 0, 0) & 0xff; + this->ttl_value_ = static_cast<u_char> (tmp); + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNIC")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + this->nic_.set (arg_shifter.get_current ()); + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGIPMULTICASTLOOP")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + this->ip_multicast_loop_ = + (ACE_OS::atoi(arg_shifter.get_current()) != 0); + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcasecmp (arg, ACE_TEXT ("-ECGNONBLOCKING")) == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + this->non_blocking_ = + (ACE_OS::atoi(arg_shifter.get_current()) != 0); + arg_shifter.consume_arg (); + } + } + + else + { + arg_shifter.ignore_arg (); + ACE_DEBUG ((LM_WARNING, + ACE_TEXT ("Ignoring <%s> option ") + ACE_TEXT ("during initialization.\n"), + arg)); + result = -1; + } + } + + if (this->validate_configuration () == -1) + return -1; + else + return result; +} + +int +TAO_ECG_Mcast_Gateway::init (const char * address_server_arg, + const Attributes & attr) +{ + this->address_server_arg_.set (address_server_arg); + + this->address_server_type_ = attr.address_server_type; + this->handler_type_ = attr.handler_type; + this->service_type_ = attr.service_type; + this->ttl_value_ = attr.ttl_value; + this->nic_.set (attr.nic.c_str ()); + this->ip_multicast_loop_ = attr.ip_multicast_loop; + this->non_blocking_ = attr.non_blocking; + + return this->validate_configuration (); +} + +int +TAO_ECG_Mcast_Gateway::init ( + const RtecEventChannelAdmin::ConsumerQOS & consumer_qos, + const char * address_server_arg, + const Attributes & attributes) +{ + this->consumer_qos_ = consumer_qos; + return this->init (address_server_arg, + attributes); +} + +int +TAO_ECG_Mcast_Gateway::validate_configuration (void) +{ + if ((this->handler_type_ == ECG_HANDLER_BASIC + || this->handler_type_ == ECG_HANDLER_UDP) + && this->service_type_ != ECG_MCAST_SENDER + && this->address_server_type_ != ECG_ADDRESS_SERVER_BASIC) + { + ACE_DEBUG ((LM_ERROR, + "Configurations for mcast handler and " + "address server do not match.\n")); + return -1; + } + + // Currently all Address Server implementations require an + // initialization string. If we ever add a new Address Server + // implementation, which does not, we'll have to remove this check. + if (this->address_server_arg_.length () == 0) + { + ACE_DEBUG ((LM_ERROR, + "Address server initializaton " + "argument not specified.\n")); + return -1; + } + + if (this->ip_multicast_loop_ != 0 + && this->ip_multicast_loop_ != 1) + { + ACE_DEBUG ((LM_ERROR, + "IP MULTICAST LOOP option must have a boolean value.\n")); + return -1; + } + + if (this->non_blocking_ != 0 + && this->non_blocking_ != 1) + { + ACE_DEBUG ((LM_ERROR, + "NON BLOCKING flag must have a boolean value.\n")); + return -1; + } + + return 0; +} + +TAO_ECG_Refcounted_Endpoint +TAO_ECG_Mcast_Gateway::init_endpoint (void) +{ + TAO_ECG_UDP_Out_Endpoint* endpoint = 0; + TAO_ECG_Refcounted_Endpoint refendpoint; + + // Try to allocate a new endpoint from the heap + ACE_NEW_NORETURN (endpoint, + TAO_ECG_UDP_Out_Endpoint); + + if (endpoint != 0) + { + refendpoint.reset (endpoint); + } + else + { + return TAO_ECG_Refcounted_Endpoint (); + } + + ACE_SOCK_Dgram& dgram = refendpoint->dgram (); + + if (dgram.open (ACE_Addr::sap_any) == -1) + { + ACE_ERROR ((LM_ERROR, + "Cannot open dgram " + "for sending mcast messages.\n")); + return TAO_ECG_Refcounted_Endpoint (); + } + + if (this->nic_.length () != 0) + { + dgram.set_nic (this->nic_.c_str ()); + } + + if (this->ttl_value_ > 0) + { + if (dgram.ACE_SOCK::set_option (IPPROTO_IP, + IP_MULTICAST_TTL, + &this->ttl_value_, + sizeof (this->ttl_value_)) + == -1) + { + ACE_ERROR ((LM_ERROR, + "Error setting TTL option on dgram " + "for sending mcast messages.\n")); + return TAO_ECG_Refcounted_Endpoint (); + } + } + + if (dgram.ACE_SOCK::set_option (IPPROTO_IP, + IP_MULTICAST_LOOP, + &this->ip_multicast_loop_, + sizeof (this->ip_multicast_loop_)) == -1) + { + ACE_ERROR ((LM_ERROR, + "Error setting MULTICAST_LOOP option " + "on dgram for sending mcast messages.\n")); + return TAO_ECG_Refcounted_Endpoint (); + } + + if (this->non_blocking_ + && dgram.enable(ACE_NONBLOCK) == -1) + { + ACE_ERROR ((LM_ERROR, + "Error setting NON BLOCKING option.\n")); + return TAO_ECG_Refcounted_Endpoint (); + } + + return refendpoint; +} + +PortableServer::ServantBase * +TAO_ECG_Mcast_Gateway::init_address_server (void) +{ + const char * address_server_arg = + (this->address_server_arg_.length ()) + ? this->address_server_arg_.c_str () : 0; + + if (this->address_server_type_ == ECG_ADDRESS_SERVER_BASIC) + { + TAO_EC_Servant_Var<TAO_ECG_Simple_Address_Server> impl = + TAO_ECG_Simple_Address_Server::create (); + if (!impl.in ()) + return 0; + + if (impl->init (address_server_arg) == -1) + { + return 0; + } + return impl._retn (); + } + + else if (this->address_server_type_ == ECG_ADDRESS_SERVER_SOURCE) + { + TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl = + TAO_ECG_Complex_Address_Server::create (1); + if (!impl.in ()) + return 0; + + if (impl->init (address_server_arg) == -1) + { + return 0; + } + return impl._retn (); + } + + else if (this->address_server_type_ == ECG_ADDRESS_SERVER_TYPE) + { + TAO_EC_Servant_Var<TAO_ECG_Complex_Address_Server> impl = + TAO_ECG_Complex_Address_Server::create (0); + if (!impl.in ()) + return 0; + + if (impl->init (address_server_arg) == -1) + { + return 0; + } + return impl._retn (); + } + + else + { + ACE_ERROR ((LM_ERROR, + "Cannot create address server: " + "unknown address server type specified.\n")); + return 0; + } +} + +TAO_ECG_Refcounted_Handler +TAO_ECG_Mcast_Gateway::init_handler (TAO_ECG_Dgram_Handler *receiver, + RtecEventChannelAdmin::EventChannel_ptr ec, + ACE_Reactor *reactor + ACE_ENV_ARG_DECL) +{ + TAO_ECG_Refcounted_Handler handler; + + const char * nic = + (this->nic_.length ()) ? this->nic_.c_str () : 0; + const char * address_server_arg = + (this->address_server_arg_.length ()) + ? this->address_server_arg_.c_str () : 0; + + if (this->handler_type_ == ECG_HANDLER_BASIC) + { + TAO_ECG_Simple_Mcast_EH * h = 0; + ACE_NEW_RETURN (h, + TAO_ECG_Simple_Mcast_EH (receiver), + handler); + handler.reset (h); + + h->reactor (reactor); + if (h->open (address_server_arg, nic) != 0) + return TAO_ECG_Refcounted_Handler (); + } + + else if (this->handler_type_ == ECG_HANDLER_COMPLEX) + { + TAO_ECG_Mcast_EH * h = 0; + ACE_NEW_RETURN (h, + TAO_ECG_Mcast_EH (receiver, nic), + handler); + handler.reset (h); + + h->reactor (reactor); + + h->open (ec ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_ECG_Refcounted_Handler ()); + } + + else if (this->handler_type_ == ECG_HANDLER_UDP) + { + TAO_ECG_UDP_EH * h = 0; + ACE_NEW_RETURN (h, + TAO_ECG_UDP_EH (receiver), + handler); + handler.reset (h); + h->reactor (reactor); + + ACE_INET_Addr ipaddr; + if (ipaddr.set (address_server_arg) != 0) + { + ACE_ERROR ((LM_ERROR, + "ERROR using address server argument " + "in ACE_INET_Addr.set ().\n")); + return TAO_ECG_Refcounted_Handler (); + } + if (h->open (ipaddr) != 0) + return TAO_ECG_Refcounted_Handler (); + } + + else + { + ACE_ERROR ((LM_ERROR, + "Cannot create handler: unknown " + "handler type specified.\n")); + return handler; + } + + return handler; +} + +TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> +TAO_ECG_Mcast_Gateway::init_sender ( + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecUDPAdmin::AddrServer_ptr address_server, + TAO_ECG_Refcounted_Endpoint endpoint_rptr + ACE_ENV_ARG_DECL) +{ + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> + sender (TAO_ECG_UDP_Sender::create ()); + if (!sender.in ()) + return sender; + + sender->init (ec, + address_server, + endpoint_rptr + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ()); + + TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown; + sender_shutdown.set_command (UDP_Sender_Shutdown (sender)); + + if (this->consumer_qos_.dependencies.length () > 0) + { + // Client supplied consumer qos. Use it. + this->consumer_qos_.is_gateway = 1; + sender->connect (this->consumer_qos_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ()); + } + else + { + // Client did not specify anything - subscribe to all events. + ACE_ConsumerQOS_Factory consumer_qos_factory; + consumer_qos_factory.start_disjunction_group (1); + consumer_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0); + RtecEventChannelAdmin::ConsumerQOS & qos = + const_cast<RtecEventChannelAdmin::ConsumerQOS &> (consumer_qos_factory.get_ConsumerQOS ()); + qos.is_gateway = 1; + + sender->connect (qos ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> ()); + } + + sender_shutdown.disallow_command (); + return sender; +} + +TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> +TAO_ECG_Mcast_Gateway::init_receiver ( + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecUDPAdmin::AddrServer_ptr address_server, + TAO_ECG_Refcounted_Endpoint endpoint_rptr + ACE_ENV_ARG_DECL) +{ + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> + receiver (TAO_ECG_UDP_Receiver::create ()); + if (!receiver.in ()) + return receiver; + + receiver->init (ec, + endpoint_rptr, + address_server + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ()); + + TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown; + receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver)); + + ACE_SupplierQOS_Factory supplier_qos_factory; + supplier_qos_factory.insert (ACE_ES_EVENT_SOURCE_ANY, + ACE_ES_EVENT_ANY, + 0, 1); + RtecEventChannelAdmin::SupplierQOS & qos = + const_cast<RtecEventChannelAdmin::SupplierQOS &> (supplier_qos_factory.get_SupplierQOS ()); + qos.is_gateway = 1; + + receiver->connect (qos ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> ()); + + receiver_shutdown.disallow_command (); + return receiver; +} + +void +TAO_ECG_Mcast_Gateway::verify_args (CORBA::ORB_ptr orb, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + if (CORBA::is_nil (ec)) + { + ACE_ERROR ((LM_ERROR, + "Nil event channel argument passed to " + "TAO_ECG_Mcast_Gateway::run().\n")); + ACE_THROW (CORBA::INTERNAL ()); + } + if (CORBA::is_nil (orb)) + { + ACE_ERROR ((LM_ERROR, + "Nil orb argument passed to " + "TAO_ECG_Mcast_Gateway::run().\n")); + ACE_THROW (CORBA::INTERNAL ()); + } +} + +void +TAO_ECG_Mcast_Gateway::run (CORBA::ORB_ptr orb, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + // Verify args. + this->verify_args (orb, ec ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Auto-cleanup objects. + TAO_EC_Object_Deactivator address_server_deactivator; + TAO_EC_Auto_Command<UDP_Sender_Shutdown> sender_shutdown; + TAO_EC_Auto_Command<UDP_Receiver_Shutdown> receiver_shutdown; + + // Set up address server. + PortableServer::ServantBase_var address_server_servant = + this->init_address_server (); + if (!address_server_servant.in ()) + { + ACE_DEBUG ((LM_ERROR, + "Unable to create address server.\n")); + ACE_THROW (CORBA::INTERNAL ()); + } + + RtecUDPAdmin::AddrServer_var address_server; + + PortableServer::POA_var poa = + address_server_servant->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + activate (address_server, + poa.in (), + address_server_servant.in (), + address_server_deactivator + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + TAO_ECG_Refcounted_Endpoint endpoint_rptr; + TAO_EC_Servant_Var<TAO_ECG_UDP_Sender> sender; + + // Set up event sender. + if (this->service_type_ == ECG_MCAST_SENDER + || this->service_type_ == ECG_MCAST_TWO_WAY) + { + endpoint_rptr = this->init_endpoint (); + if (endpoint_rptr.get () == 0) + { + ACE_THROW (CORBA::INTERNAL ()); + } + + sender = this->init_sender (ec, + address_server.in (), + endpoint_rptr + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + if (!sender.in ()) + { + ACE_THROW (CORBA::INTERNAL ()); + } + + sender_shutdown.set_command (UDP_Sender_Shutdown (sender)); + } + + // Set up event receiver. + TAO_EC_Servant_Var<TAO_ECG_UDP_Receiver> receiver; + if (this->service_type_ == ECG_MCAST_RECEIVER + || this->service_type_ == ECG_MCAST_TWO_WAY) + { + ACE_Reactor *reactor = orb->orb_core ()->reactor (); + + receiver = this->init_receiver (ec, + address_server.in (), + endpoint_rptr + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + if (!receiver.in ()) + { + ACE_THROW (CORBA::INTERNAL ()); + } + + receiver_shutdown.set_command (UDP_Receiver_Shutdown (receiver)); + + TAO_ECG_Refcounted_Handler + handler_rptr (this->init_handler (receiver.in (), + ec, + reactor + ACE_ENV_ARG_PARAMETER)); + ACE_CHECK; + if (handler_rptr.get () == 0) + { + ACE_THROW (CORBA::INTERNAL ()); + } + receiver->set_handler_shutdown (handler_rptr); + } + + // Everything went ok - disable auto-cleanup. + address_server_deactivator.disallow_deactivation (); + receiver_shutdown.disallow_command (); + sender_shutdown.disallow_command (); +} + +TAO_END_VERSIONED_NAMESPACE_DECL + +// **************************************************************** + +ACE_STATIC_SVC_DEFINE (TAO_ECG_Mcast_Gateway, + ACE_TEXT ("ECG_Mcast_Gateway"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_ECG_Mcast_Gateway), + ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ, + 0) +ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_ECG_Mcast_Gateway) |