diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-04 22:00:26 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-04 22:00:26 +0000 |
commit | 8fd36918500991a638d81cdcead9c9c80fd119c1 (patch) | |
tree | 6e61bb9e28f54be0b4e6e1bdc8a1b5ff748e426e | |
parent | 0eb92a3dd5f7ee31b0ef3a5c4c1780a4977a89bb (diff) | |
download | ATCD-8fd36918500991a638d81cdcead9c9c80fd119c1.tar.gz |
ChangeLogTag:Fri Sep 4 16:44:19 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
31 files changed, 1996 insertions, 1308 deletions
diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c index a57d875e88d..2d035267e17 100644 --- a/TAO/ChangeLog-98c +++ b/TAO/ChangeLog-98c @@ -1,3 +1,62 @@ +Fri Sep 4 16:44:19 1998 Carlos O'Ryan <coryan@cs.wustl.edu> + + * orbsvcs/orbsvcs/Makefile: + * orbsvcs/orbsvcs/RtecUDPAdmin.idl: + * orbsvcs/orbsvcs/RtecEventChannelAdmin.idl: + * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h: + * orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp: + * orbsvcs/orbsvcs/Event/EC_UDP_Admin.h: + * orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp: + * orbsvcs/orbsvcs/Event/Event_Channel.h: + * orbsvcs/orbsvcs/Event/Event_Channel.i: + * orbsvcs/orbsvcs/Event/Event_Channel.cpp: + * orbsvcs/tests/EC_Mcast/EC_Mcast.h: + * orbsvcs/tests/EC_Mcast/EC_Mcast.i: + * orbsvcs/tests/EC_Mcast/EC_Mcast.cpp: + * orbsvcs/tests/EC_Mcast/sample.cfg: + * orbsvcs/tests/EC_Mcast/README: + New implementation of the EC_Mcast test, the test can now handle + multiple processes, each process joins several multicast groups + and push events to a (potentially different) set of multicast + groups. Since the configuration is more complicated a sample + file is included. + The test is prepared to support several interesting features, + such as: + + Dynamic changes in the multicast group joined. + + Handle OS limitations wrt the maximum number of mcast groups + per socket. + + Support different mappings for the type->mcast group + relation (currently the event type *is* the multicast + group). + The current implementation also offers the initial interfaces to + observe changes in the subcription and/or publication list of a + *remote* event channel, this will enable the automation and + optimization of the local publication list (there is no sense in + sending an event if nobody is currently interested). + + * orbsvcs/orbsvcs/Event_Utilities.cpp: + * orbsvcs/orbsvcs/Event_Utilities.i: + * orbsvcs/orbsvcs/Event/Dispatching_Modules.h: + * orbsvcs/orbsvcs/Event/Dispatching_Modules.i: + * orbsvcs/orbsvcs/RtecEventComm.idl: + * orbsvcs/orbsvcs/Event/EC_Gateway.h: + * orbsvcs/orbsvcs/Event/EC_Gateway.cpp: + * orbsvcs/orbsvcs/Event/Event_Channel.h: + * orbsvcs/orbsvcs/Event/Event_Channel.i: + * orbsvcs/orbsvcs/Event/Event_Channel.cpp: + * orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp: + * orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp: + * orbsvcs/tests/EC_Multiple/EC_Multiple.cpp: + * orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp: + * orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp: + * orbsvcs/tests/Event_Latency/Event_Latency.cpp: + * orbsvcs/tests/Simulator/Event_Supplier/DOVE_Supplier.cpp: + Added a new IDL structure to represent the Event Header, this + will let us factor out the minimum information needed to + transmit QoS and subscription/publication info. + I also normalized some of the field names in the RtecEventComm + structures. + Fri Sep 4 16:22:17 1998 Nagarajan Surendran <naga@cs.wustl.edu> * tests/Cubit/TAO/MT_Cubit/Globals.{h,cpp}: Added macro VX_VME_INIT diff --git a/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp b/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp index bb142f67ec5..970b83cd83a 100644 --- a/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp +++ b/TAO/examples/Simulator/Event_Supplier/DOVE_Supplier.cpp @@ -53,12 +53,12 @@ DOVE_Supplier::notify (CORBA::Any &message) event.source_ = SOURCE_ID; event.type_ = ACE_ES_EVENT_NOTIFICATION; event.ttl_ = 1; - event.creation_time_ = ORBSVCS_Time::zero; - event.ec_recv_time_ = ORBSVCS_Time::zero; - event.ec_send_time_ = ORBSVCS_Time::zero; + event.header.creation_time = ORBSVCS_Time::zero; + event.header.ec_recv_time = ORBSVCS_Time::zero; + event.header.ec_send_time = ORBSVCS_Time::zero; //event.data_.x = 0; //event.data_.y = 0; - event.data_.any_value = message; + event.data.any_value = message; RtecEventComm::EventSet events; events.length (1); @@ -192,13 +192,14 @@ DOVE_Supplier::connect_Supplier () qos.publications[0].event.source_ = SOURCE_ID; qos.publications[0].event.type_ = ACE_ES_EVENT_NOTIFICATION; qos.publications[0].event.ttl_ = 1; - qos.publications[0].event.creation_time_ = ORBSVCS_Time::zero; - qos.publications[0].event.ec_recv_time_ = ORBSVCS_Time::zero; - qos.publications[0].event.ec_send_time_ = ORBSVCS_Time::zero; - qos.publications[0].event.data_.any_value.replace (CORBA::_tc_short, - &x, - 0, - TAO_TRY_ENV); + qos.publications[0].event.header.creation_time = ORBSVCS_Time::zero; + qos.publications[0].event.header.ec_recv_time = ORBSVCS_Time::zero; + qos.publications[0].event.header.ec_send_time = ORBSVCS_Time::zero; + qos.publications[0].event.data.any_value.replace (CORBA::_tc_short, + &x, + 0, + TAO_TRY_ENV); + TAO_CHECK_ENV; qos.publications[0].dependency_info.number_of_calls = 1; qos.publications[0].dependency_info.rt_info = rt_info; diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h index eb6ee59581e..8949f967f73 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h +++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h @@ -151,9 +151,10 @@ public: ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer, const RtecEventComm::Time &time, RtecScheduler::handle_t rt_info); - // Set consumer_ to <consumer> and sets single_event_.creation_time_ - // to <time>. Sets use_single_event_ to 1. <rt_info> describes the - // method receiving this dispatch. + // Set consumer_ to <consumer> and sets + // single_event_.header.creation_time to <time>. Sets + // use_single_event_ to 1. <rt_info> describes the method + // receiving this dispatch. ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer, ACE_ES_Event_Container *event, diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i index 93b7b6b5c09..17c6be1cb0d 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i +++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.i @@ -72,8 +72,8 @@ ACE_ES_Dispatch_Request::ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consu single_event_ (), event_set_ () { - single_event_.creation_time_ = time; - single_event_.type_ = ACE_ES_EVENT_TIMEOUT; + single_event_.header.creation_time = time; + single_event_.header.type = ACE_ES_EVENT_TIMEOUT; } ACE_INLINE void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp index 173edb03d0d..4d5585eff26 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp @@ -226,12 +226,12 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, for (u_int i = 0; i < events.length (); ++i) { //ACE_DEBUG ((LM_DEBUG, "type = %d ", events[i].type_)); - if (events[i].ttl_ > 0) + if (events[i].header.ttl > 0) { count++; out.length (count); out[count - 1] = events[i]; - out[count - 1].ttl_--; + out[count - 1].header.ttl--; } } //ACE_DEBUG ((LM_DEBUG, "count = %d\n", count)); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h index 1e90de9ada6..0897dcb7770 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h @@ -180,4 +180,4 @@ private: // We talk to the EC (as a consumer) using this proxy. }; -#endif /* ACE_EVENT_CHANNEL_H */ +#endif /* ACE_EC_GATEWAY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp index 5e9673ddbaa..a6ffad336f7 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.cpp @@ -15,19 +15,27 @@ TAO_ECG_UDP_Sender::TAO_ECG_UDP_Sender (void) int TAO_ECG_UDP_Sender::get_local_addr (ACE_INET_Addr& addr) { - return this->dgram_.get_local_addr (addr); + if (this->dgram_ == 0) + return -1; + return this->dgram_->get_local_addr (addr); } void TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecScheduler::Scheduler_ptr lcl_sched, const char* lcl_name, - const ACE_INET_Addr& ipaddr, + RtecUDPAdmin::AddrServer_ptr addr_server, + ACE_SOCK_Dgram *dgram, CORBA::Environment &_env) { this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); + this->addr_server_ = + RtecUDPAdmin::AddrServer::_duplicate (addr_server); + + this->dgram_ = dgram; + this->lcl_info_ = lcl_sched->create (lcl_name, _env); if (_env.exception () != 0) return; @@ -45,14 +53,6 @@ TAO_ECG_UDP_Sender::init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecScheduler::OPERATION, _env); if (_env.exception () != 0) return; - - if (this->dgram_.open (ipaddr) == -1) - { - // @@ TODO Use a Event Channel specific exception - ACE_ERROR ((LM_ERROR, "ECG_UDP::init - Dgram open failed\n")); - _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); - } - if (_env.exception () != 0) return; } void @@ -61,14 +61,6 @@ TAO_ECG_UDP_Sender::shutdown (CORBA::Environment& _env) this->close (_env); if (_env.exception () == 0) return; this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_nil (); - - if (this->dgram_.close () == -1) - { - // @@ TODO Use a Event Channel specific exception - ACE_ERROR ((LM_ERROR, "ECG_UDP::init - Dgram close failed\n")); - _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); - } - if (_env.exception () != 0) return; } void @@ -139,7 +131,7 @@ void TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, CORBA::Environment & _env) { - ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - ")); + // ACE_DEBUG ((LM_DEBUG, "ECG_UDP_Sender::push - ")); if (events.length () == 0) { @@ -147,7 +139,7 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, return; } - ACE_DEBUG ((LM_DEBUG, "%d event(s) - ", events.length ())); + // ACE_DEBUG ((LM_DEBUG, "%d event(s) - ", events.length ())); // @@ TODO, there is an extra data copy here, we should do the event // modification without it and only compact the necessary events. @@ -155,25 +147,32 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, RtecEventComm::EventSet out (events.length ()); for (u_int i = 0; i < events.length (); ++i) { - //ACE_DEBUG ((LM_DEBUG, "type = %d ", events[i].type_)); - if (events[i].ttl_ > 0) - { - count++; - out.length (count); - out[count - 1] = events[i]; - out[count - 1].ttl_--; - } - } - ACE_DEBUG ((LM_DEBUG, "count = %d\n", count)); + if (events[i].header.ttl <= 0) + continue; + + const RtecEventComm::Event& e = events[i]; + + // Copy only the header... + RtecEventComm::EventHeader header = e.header; + header.ttl--; + + RtecUDPAdmin::UDP_Addr udp_addr; + this->addr_server_->get_addr (header, udp_addr, _env); + TAO_CHECK_ENV_RETURN_VOID(_env); - if (count > 0) - { TAO_OutputCDR cdr; cdr.write_boolean (TAO_ENCAP_BYTE_ORDER); cdr.write_ulong (0); // Place holder for size... - cdr.encode (RtecEventComm::_tc_EventSet, &out, 0, _env); - if (_env.exception () != 0) return; + // Marshal as if it was a sequence of one element, notice how we + // marshal a modified version of the header, but the data is not + // copied... + cdr.write_ulong (1); + cdr.encode (RtecEventComm::_tc_EventHeader, &header, 0, _env); + TAO_CHECK_ENV_RETURN_VOID(_env); + + cdr.encode (RtecEventComm::_tc_EventData, &e.data, 0, _env); + TAO_CHECK_ENV_RETURN_VOID(_env); CORBA::ULong bodylen = cdr.total_length (); char* buf = ACE_const_cast(char*,cdr.buffer ()); @@ -192,8 +191,9 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, #endif // This is a good maximum, because Dgrams cannot be longer than - // 64K and the usual size for a CDR fragment is 512 bytes, still - // if this is not enough we allocate memory from the heap. + // 64K and the usual size for a CDR fragment is 512 bytes. + // @@ TODO In the future we may need to allocate some memory + // from the heap. const int TAO_WRITEV_MAX = 128; ACE_IO_Vector iov[TAO_WRITEV_MAX]; @@ -207,20 +207,23 @@ TAO_ECG_UDP_Sender::push (const RtecEventComm::EventSet &events, iovcnt++; } - ssize_t n = this->dgram_.send (iov, iovcnt); + ACE_INET_Addr inet_addr (udp_addr.port, udp_addr.ipaddr); + // ACE_DEBUG ((LM_DEBUG, "sending to (%d,%u)\n", + // udp_addr.port, udp_addr.ipaddr)); + ssize_t n = this->dgram_->send (iov, iovcnt, inet_addr); if (n == -1) { // @@ TODO Use a Event Channel specific exception ACE_DEBUG ((LM_DEBUG, "ECG_UDP (%t) send failed %p\n", "")); - _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); } else if (n == 0) { // @@ TODO Use a Event Channel specific exception ACE_DEBUG ((LM_DEBUG, "ECG_UDP (%t) EOF on send \n")); - _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + TAO_THROW(CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); } } } @@ -445,15 +448,25 @@ TAO_ECG_Mcast_EH::TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv) } int -TAO_ECG_Mcast_EH::open (const ACE_INET_Addr& mcast_group) +TAO_ECG_Mcast_EH::open (void) { - if (this->dgram_.subscribe (mcast_group) == -1) - return -1; return this->reactor ()->register_handler (this, ACE_Event_Handler::READ_MASK); } int +TAO_ECG_Mcast_EH::subscribe (const ACE_INET_Addr &mcast_addr) +{ + return this->dgram_.subscribe (mcast_addr); +} + +int +TAO_ECG_Mcast_EH::unsubscribe (const ACE_INET_Addr &mcast_addr) +{ + return this->dgram_.unsubscribe (); +} + +int TAO_ECG_Mcast_EH::close (void) { if (this->reactor ()->remove_handler (this, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h index 5031c3ad020..23d7d8416f0 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway_UDP.h @@ -55,8 +55,8 @@ #include "ace/SOCK_CODgram.h" #include "ace/SOCK_Dgram_Mcast.h" -#include "orbsvcs/RtecEventCommS.h" #include "orbsvcs/RtecEventChannelAdminS.h" +#include "orbsvcs/RtecUDPAdminS.h" #include "orbsvcs/orbsvcs_export.h" class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushConsumer @@ -69,6 +69,8 @@ class TAO_ORBSVCS_Export TAO_ECG_UDP_Sender : public POA_RtecEventComm::PushCons // This class connect as a consumer to an EventChannel // and it sends the events using UDP, the UDP address can be a // normal IP address or it can be a multicast group. + // The UDP address is obtained from a RtecUDPAdmin::AddrServer + // class. // It marshalls the events using TAO CDR classes. // No provisions are taken for message fragmentation. // @@ -81,7 +83,8 @@ public: void init (RtecEventChannelAdmin::EventChannel_ptr lcl_ec, RtecScheduler::Scheduler_ptr lcl_sched, const char* lcl_name, - const ACE_INET_Addr& ipaddr, + RtecUDPAdmin::AddrServer_ptr addr_server, + ACE_SOCK_Dgram* dgram, CORBA::Environment &_env); // To do its job this class requires to know the local EC it will // connect to; it also requires to build an RT_Info for the local @@ -118,8 +121,12 @@ private: RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; // We talk to the EC (as a consumer) using this proxy. - ACE_SOCK_CODgram dgram_; - // The datagram used to send the data. + RtecUDPAdmin::AddrServer_var addr_server_; + // We query this object to determine where are the events sent. + + ACE_SOCK_Dgram *dgram_; + // The datagram used to sendto(), this object is *not* owned by the + // UDP_Sender. }; class TAO_ORBSVCS_Export TAO_ECG_UDP_Receiver : public POA_RtecEventComm::PushSupplier @@ -226,7 +233,7 @@ class TAO_ORBSVCS_Export TAO_ECG_Mcast_EH : public ACE_Event_Handler public: TAO_ECG_Mcast_EH (TAO_ECG_UDP_Receiver *recv); - int open (const ACE_INET_Addr& mcast_group); + int open (void); // Open the datagram (join the mcast group) and register with // this->reactor() @@ -234,6 +241,10 @@ public: // Close the datagram (leave the mcast group) and unregister with // the reactor. + int subscribe (const ACE_INET_Addr &mcast_addr); + int unsubscribe (const ACE_INET_Addr &mcast_addr); + // Control the multicast group subscriptions + // Reactor callbacks virtual int handle_input (ACE_HANDLE fd); virtual ACE_HANDLE get_handle (void) const; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp new file mode 100644 index 00000000000..d45ce1002ab --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.cpp @@ -0,0 +1,23 @@ +// $Id$ + +#include "orbsvcs/Event/EC_UDP_Admin.h" + +ACE_RCSID(Event, EC_UDP_Admin, "$Id$") + +TAO_EC_Simple_AddrServer::TAO_EC_Simple_AddrServer (CORBA::UShort port) + : port_ (port) +{ +} + +TAO_EC_Simple_AddrServer::~TAO_EC_Simple_AddrServer (void) +{ +} + +void +TAO_EC_Simple_AddrServer::get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr_out addr, + CORBA::Environment&) +{ + addr.ipaddr = header.type; + addr.port = this->port_; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h new file mode 100644 index 00000000000..ef2220d1714 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_UDP_Admin.h @@ -0,0 +1,70 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// TAO services +// +// = FILENAME +// EC_UDP_Admin +// +// = AUTHOR +// Carlos O'Ryan +// +// = DESCRIPTION +// Simple implementations of the UDP Administration service. +// +// connects to a "remote" EC as a consumer, it also connects to the +// <local> EC as a supplier of events, this later EC is usually +// collocated. +// The QoS parameters for both connections must be provided by the +// user. +// To avoid infinite loops of events the Gateway descreases the TTL +// field of the events and will not deliver any events with TTL less +// than or equal to 0. +// +// = TODO +// The class makes an extra copy of the events, we need to +// investigate if closer collaboration with its collocated EC could +// be used to remove that copy. +// +// ============================================================================ + +#ifndef TAO_EC_UDP_ADMIN_H +#define TAO_EC_UDP_ADMIN_H + +#include "orbsvcs/RtecUDPAdminS.h" +#include "orbsvcs/orbsvcs_export.h" + +class TAO_ORBSVCS_Export TAO_EC_Simple_AddrServer : public POA_RtecUDPAdmin::AddrServer +{ + // = TITLE + // TAO Real-time Event Service; a simple UDP address server. + // + // = DESCRIPTION + // The EC is able to use multiple multicast groups to transmit its + // data, the is given control over the mapping between the Event + // (type,source) pair and the (ipaddr,port) pair using a + // AddrServer. + // This class implements a very simple server that simply maps the + // <type> component to the <ipaddr> and uses a fixed <port>, + // provided at initialization time. + // +public: + TAO_EC_Simple_AddrServer (CORBA::UShort port); + // Constructor + + virtual ~TAO_EC_Simple_AddrServer (void); + // Destructor + + // = The RtecUDPAdmin::AddrServer methods + virtual void get_addr (const RtecEventComm::EventHeader& header, + RtecUDPAdmin::UDP_Addr_out addr, + CORBA::Environment& env); + +private: + CORBA::UShort port_; +}; + +#endif /* TAO_EC_UDP_ADMIN_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp index d1c41ef578a..e050778f48c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.cpp @@ -165,7 +165,7 @@ public: ACE_ES_Dependency_Iterator iter (consumer->qos ().dependencies); while (iter.advance_dependency () == 0) { - RtecEventComm::EventType &type = (*iter).event.type_; + RtecEventComm::EventType &type = (*iter).event.header.type; if (type != ACE_ES_GLOBAL_DESIGNATOR && type != ACE_ES_CONJUNCTION_DESIGNATOR && type != ACE_ES_DISJUNCTION_DESIGNATOR) @@ -422,7 +422,7 @@ ACE_Push_Supplier_Proxy::connect_push_supplier (RtecEventComm::PushSupplier_ptr // @@ TODO: The SupplierQOS should have a more reasonable interface to // obtain the supplier_id(), BTW, a callback to push_supplier will // not work: it usually results in some form of dead-lock. - this->source_id_ = qos_.publications[0].event.source_; + this->source_id_ = qos_.publications[0].event.header.source; supplier_module_->connected (this, _env); } @@ -438,7 +438,7 @@ ACE_Push_Supplier_Proxy::push (const RtecEventComm::EventSet &event, { RtecEventComm::Event& ev = ACE_const_cast(RtecEventComm::Event&,event[i]); - ORBSVCS_Time::hrtime_to_TimeT (ev.ec_recv_time_, ec_recv); + ORBSVCS_Time::hrtime_to_TimeT (ev.header.ec_recv_time, ec_recv); } supplier_module_->push (this, event, _env); } @@ -812,6 +812,21 @@ ACE_EventChannel::update_supplier_gwys (CORBA::Environment& _env) } } +RtecEventChannelAdmin::Observer_Handle +ACE_EventChannel::append_observer (RtecEventChannelAdmin::Observer_ptr, + CORBA::Environment &) +{ + return 0; + // @@ TODO fill in the "implementation details" +} + +void +ACE_EventChannel::remove_observer (RtecEventChannelAdmin::Observer_Handle, + CORBA::Environment &) +{ + // @@ TODO fill in the "implementation details" +} + // **************************************************************** ACE_ES_Disjunction_Group::~ACE_ES_Disjunction_Group (void) @@ -1267,7 +1282,7 @@ ACE_ES_Consumer_Module::push (const ACE_ES_Dispatch_Request *request, { RtecEventComm::Event& ev = ACE_const_cast(RtecEventComm::Event&,event_set[i]); - ORBSVCS_Time::hrtime_to_TimeT (ev.ec_send_time_, ec_send); + ORBSVCS_Time::hrtime_to_TimeT (ev.header.ec_send_time, ec_send); } request->consumer ()->push (event_set, _env); } @@ -1333,9 +1348,9 @@ ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, CORBA::ULong cc = 0; CORBA::ULong sc = 0; - dep[cc].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR; - dep[cc].event.source_ = 0; - dep[cc].event.creation_time_ = ORBSVCS_Time::zero; + dep[cc].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR; + dep[cc].event.header.source = 0; + dep[cc].event.header.creation_time = ORBSVCS_Time::zero; dep[cc].rt_info = 0; cc++; @@ -1354,7 +1369,7 @@ ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, RtecEventComm::Event& event = c->qos ().dependencies[j].event; - RtecEventComm::EventType type = event.type_; + RtecEventComm::EventType type = event.header.type; if (type <= ACE_ES_EVENT_UNDEFINED) continue; @@ -1369,22 +1384,22 @@ ACE_ES_Consumer_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, CORBA::ULong k; for (k = 0; k < cc; ++k) { - if (dep[k].event.type_ == event.type_ - && dep[k].event.source_ == event.source_) + if (dep[k].event.header.type == event.header.type + && dep[k].event.header.source == event.header.source) break; } if (k == cc) { - dep[cc].event.type_ = event.type_; - dep[cc].event.source_ = event.source_; - dep[cc].event.creation_time_ = ORBSVCS_Time::zero; + dep[cc].event.header.type = event.header.type; + dep[cc].event.header.source = event.header.source; + dep[cc].event.header.creation_time = ORBSVCS_Time::zero; // The RT_Info is filled up later. dep[cc].rt_info = 0; cc++; - pub[sc].event.type_ = event.type_; - pub[sc].event.source_ = event.source_; - pub[sc].event.creation_time_ = ORBSVCS_Time::zero; + pub[sc].event.header.type = event.header.type; + pub[sc].event.header.source = event.header.source; + pub[sc].event.header.creation_time = ORBSVCS_Time::zero; pub[sc].dependency_info.dependency_type = RtecScheduler::TWO_WAY_CALL; pub[sc].dependency_info.number_of_calls = 1; @@ -1471,9 +1486,9 @@ int ACE_ES_Correlation_Module::schedule_timeout (ACE_ES_Consumer_Rep_Timeout *consumer) { RtecEventComm::Time &interval = - consumer->dependency ()->event.creation_time_; + consumer->dependency ()->event.header.creation_time; RtecEventComm::Time &delay = - consumer->dependency ()->event.creation_time_; + consumer->dependency ()->event.header.creation_time; // Store the preemption priority so we can cancel the correct timer. // The priority values may change during the process lifetime (e.g., @@ -1528,9 +1543,9 @@ ACE_ES_Correlation_Module::reschedule_timeout (ACE_ES_Consumer_Rep_Timeout *cons else { RtecEventComm::Time &interval = - consumer->dependency ()->event.creation_time_; + consumer->dependency ()->event.header.creation_time; RtecEventComm::Time &delay = - consumer->dependency ()->event.creation_time_; + consumer->dependency ()->event.header.creation_time; // Store the preemption priority so we can cancel the correct timer. // The priority values may change during the process lifetime (e.g., @@ -1705,7 +1720,7 @@ ACE_ES_Consumer_Correlation::connected (ACE_Push_Consumer_Proxy *consumer, // Keep track of how many conjunction and disjunction groups are // registered. Update the index pointers so that the helper // functions can update the appropriate group objects. - switch ((*iter).event.type_) + switch ((*iter).event.header.type) { case ACE_ES_CONJUNCTION_DESIGNATOR: cgroup_index++; @@ -1880,8 +1895,8 @@ ACE_ES_Consumer_Correlation::get_consumer_rep (RtecEventChannelAdmin::Dependency RtecEventComm::Event& e = consumer_reps_[x]->dependency ()->event; // If <dependency> matches any previously subscribed consumer // reps, we'll reuse it. - if (e.type_ == dependency.event.type_ - && e.source_ == dependency.event.source_ ) + if (e.header.type == dependency.event.header.type + && e.header.source == dependency.event.header.source ) { rep = consumer_reps_[x]; break; @@ -2120,8 +2135,10 @@ ACE_ES_Consumer_Rep_Timeout::execute (void) CORBA::Environment __env; ACE_Time_Value tv = ACE_OS::gettimeofday (); ORBSVCS_Time::Time_Value_to_TimeT - (timeout_event_->creation_time_, tv); - correlation_->correlation_module_->push (this, timeout_event_, __env); + (timeout_event_->header.creation_time, tv); + correlation_->correlation_module_->push (this, + timeout_event_, + __env); if (__env.exception () != 0) ACE_ERROR ((LM_ERROR, "ACE_ES_Consumer_Rep_Timeout::execute: unexpected exception.\n")); } @@ -2174,7 +2191,7 @@ ACE_ES_Subscription_Module::connected (ACE_Push_Supplier_Proxy *supplier, RtecEventChannelAdmin::PublicationSet &publications = supplier->qos ().publications; - sid = publications[0].event.source_; + sid = publications[0].event.header.source; for (CORBA::ULong index=0; index < publications.length (); index++) { // Check to make sure an RT_Info was specified. @@ -2188,7 +2205,7 @@ ACE_ES_Subscription_Module::connected (ACE_Push_Supplier_Proxy *supplier, #endif RtecEventComm::EventType event_type = - publications[index].event.type_; + publications[index].event.header.type; // @@ TODO we should throw something Check to make sure a type // was specified. @@ -2474,12 +2491,12 @@ ACE_ES_Subscription_Module::push_source_type (ACE_Push_Supplier_Proxy *source, return 0; } - if (supplier_map.find (event->type_, subscribers) == -1) + if (supplier_map.find (event->header.type, subscribers) == -1) { ACE_DEBUG ((LM_ERROR, "EC (%t) ACE_ES_Subscription_Module::push_source_type" " Warning: event type %d not registered.\n", - event->type_)); + event->header.type)); ACE_TIMEPROBE (TAO_EVENT_CHANNEL_PUSH_SOURCE_TYPE); return 0; // continue anyway } @@ -2789,23 +2806,23 @@ ACE_ES_Subscription_Module::subscribe (ACE_ES_Consumer_Rep *consumer) int result = 0; RtecEventComm::Event &event = consumer->dependency ()->event; - if (event.source_ == 0) + if (event.header.source == 0) // Not source-based subscription. { - if (event.type_ == ACE_ES_EVENT_ANY) + if (event.header.type == ACE_ES_EVENT_ANY) result = this->subscribe_all (consumer); else - result = this->subscribe_type (consumer, event.type_); + result = this->subscribe_type (consumer, event.header.type); } else // Source-based subscription. { - if (event.type_ == ACE_ES_EVENT_ANY) - result = this->subscribe_source (consumer, event.source_); + if (event.header.type == ACE_ES_EVENT_ANY) + result = this->subscribe_source (consumer, event.header.source); else result = this->subscribe_source_type (consumer, - event.source_, - event.type_); + event.header.source, + event.header.type); } return result; @@ -2823,18 +2840,18 @@ ACE_ES_Subscription_Module::unsubscribe (ACE_ES_Consumer_Rep *consumer) RtecEventComm::Event &event = consumer->dependency ()->event; - if (event.type_ != ACE_ES_EVENT_ANY) + if (event.header.type != ACE_ES_EVENT_ANY) { // Remove the consumer from the global type-based subscription list. ACE_ES_Subscription_Info::remove (type_subscribers_, consumer, - event.type_); + event.header.type); } else // Remove the consumer from the global source-based subscription list. ACE_ES_Subscription_Info::remove (source_subscribers_, consumer, - event.source_); + event.header.source); #if 0 // @@ TODO This code was removed and I'm (coryan) adding it again @@ -2858,19 +2875,21 @@ ACE_ES_Subscription_Module::unsubscribe (ACE_ES_Consumer_Rep *consumer) int result = 0; - if (event.source_ == 0) + if (event.header.source == 0) { - if (event.type_ == ACE_ES_EVENT_ANY) + if (event.header.type == ACE_ES_EVENT_ANY) result = this->unsubscribe_all (consumer); else - result = this->unsubscribe_type (consumer, event.type_); + result = this->unsubscribe_type (consumer, event.header.type); } else { - if (event.type_ == ACE_ES_EVENT_ANY) - result = this->unsubscribe_source (consumer, event.source_); + if (event.header.type == ACE_ES_EVENT_ANY) + result = this->unsubscribe_source (consumer, event.header.source); else - result = this->unsubscribe_source_type (consumer, event.source_, event.type_); + result = this->unsubscribe_source_type (consumer, + event.header.source, + event.header.type); } return result; #else @@ -2943,13 +2962,13 @@ ACE_ES_Subscription_Module::unsubscribe_source_type (ACE_ES_Consumer_Rep *consum Supplier_Iterator iter (all_suppliers_); // Step through all supplier proxies looking for a match to the - // consumer's event.source_. This is the same as unsubscribe_type, - // only we can check the source first. + // consumer's event.header.source. This is the same as + // unsubscribe_type, only we can check the source first. for (ACE_Push_Supplier_Proxy **proxy = 0; iter.next (proxy) != 0; iter.advance ()) // If the proxy matches the source id we're looking for, try to - // remove <consumer> from the proxy's <event.type_> set. + // remove <consumer> from the proxy's <event.header.type> set. if ((**proxy) == source) { ACE_ES_WGUARD mon ((*proxy)->subscription_info ().lock_); @@ -3228,9 +3247,9 @@ ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, CORBA::ULong cc = 0; CORBA::ULong sc = 0; - dep[cc].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR; - dep[cc].event.source_ = 0; - dep[cc].event.creation_time_ = ORBSVCS_Time::zero; + dep[cc].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR; + dep[cc].event.header.source = 0; + dep[cc].event.header.creation_time = ORBSVCS_Time::zero; dep[cc].rt_info = 0; cc++; @@ -3249,7 +3268,7 @@ ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, RtecEventComm::Event& event = s->qos ().publications[j].event; - RtecEventComm::EventType type = event.type_; + RtecEventComm::EventType type = event.header.type; if (type <= ACE_ES_EVENT_UNDEFINED) continue; @@ -3264,22 +3283,22 @@ ACE_ES_Supplier_Module::fill_qos (RtecEventChannelAdmin::ConsumerQOS& c_qos, CORBA::ULong k; for (k = 0; k < sc; ++k) { - if (pub[k].event.type_ == event.type_ - && pub[k].event.source_ == event.source_) + if (pub[k].event.header.type == event.header.type + && pub[k].event.header.source == event.header.source) break; } if (k == sc) { - dep[cc].event.type_ = event.type_; - dep[cc].event.source_ = event.source_; - dep[cc].event.creation_time_ = ORBSVCS_Time::zero; + dep[cc].event.header.type = event.header.type; + dep[cc].event.header.source = event.header.source; + dep[cc].event.header.creation_time = ORBSVCS_Time::zero; // The RT_Info is filled up later. dep[cc].rt_info = 0; cc++; - pub[sc].event.type_ = event.type_; - pub[sc].event.source_ = event.source_; - pub[sc].event.creation_time_ = ORBSVCS_Time::zero; + pub[sc].event.header.type = event.header.type; + pub[sc].event.header.source = event.header.source; + pub[sc].event.header.creation_time = ORBSVCS_Time::zero; pub[sc].dependency_info.dependency_type = RtecScheduler::TWO_WAY_CALL; pub[sc].dependency_info.number_of_calls = 1; @@ -3386,10 +3405,10 @@ dump_event (const RtecEventComm::Event &event) ACE_DEBUG ((LM_DEBUG, "source_ = %d " "type_ = %d " "time_ = %u.\n", - (void*)event.source_, - event.type_, + (void*)event.header.source, + event.header.type, // The divide-by-1 is for ACE_U_LongLong support. - ORBSVCS_Time::to_hrtime (event.creation_time_) / 1)); + ORBSVCS_Time::to_hrtime (event.header.creation_time) / 1)); } // ************************************************************ diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h index a73b8e27a59..d84afcd5a3e 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.h @@ -169,11 +169,12 @@ class TAO_Module_Factory; // ec.. class TAO_ORBSVCS_Export ACE_EventChannel : public POA_RtecEventChannelAdmin::EventChannel // = TITLE -// ACE Event Channel. +// TAO's Real-time Event Channel. // // = DESCRIPTION -// Implementation of COSS Event Channel. For more detailed -// information, see http://www.cs.wustl.edu/~mda/event.html. +// This class implements the interface defined in +// RtecEventChannelAdmin.idl. For more details check: +// http://www.cs.wustl.edu/~coryan/EC/JSAC98.pdf { public: enum { INITIAL_STATE = 0, @@ -191,20 +192,6 @@ public: virtual ~ACE_EventChannel (void); // Calls destroy. - // = Accessor methods to Event Channel objects. The Event Channel - // acts as a sort of service repository of object references. All - // objects in the Event Service come to this interface to obtain - // object references during initialization. - - virtual RtecEventChannelAdmin::ConsumerAdmin_ptr for_consumers (CORBA::Environment &); - // Consumer administration factory method. - - virtual RtecEventChannelAdmin::SupplierAdmin_ptr for_suppliers (CORBA::Environment &); - // Supplier administration factory method. - - virtual void destroy (CORBA::Environment &); - // Explicitly shut down the channel. - RtecEventChannelAdmin::EventChannel_ptr get_ref (CORBA::Environment &); // Allow transformations to RtecEventChannelAdmin::EventChannel. @@ -249,9 +236,31 @@ public: void update_supplier_gwys (CORBA::Environment& _env); // The consumer (or supplier) list has changed, thus the EC has to // inform any gateways it has. - // TODO: currently we only support consumer gateways. ACE_Task_Manager* task_manager (void) const; + // Each Event Channel has its own Task_Manager to handle timers. + + // = The RtecEventChannelAdmin::EventChannel methods. + + virtual RtecEventChannelAdmin::ConsumerAdmin_ptr + for_consumers (CORBA::Environment &); + // In this implementation of the EC this returns the interface for + // the Consumer_Module. + + virtual RtecEventChannelAdmin::SupplierAdmin_ptr + for_suppliers (CORBA::Environment &); + // Return an interface to the Supplier_Module. + + virtual void destroy (CORBA::Environment &); + // Shutdown the EC, free all resources, stop all threads and then + // shutdown the server where the Servant is running. + + virtual RtecEventChannelAdmin::Observer_Handle + append_observer (RtecEventChannelAdmin::Observer_ptr observer, + CORBA::Environment &env); + virtual void remove_observer (RtecEventChannelAdmin::Observer_Handle, + CORBA::Environment &env); + // The observer manipulators private: ACE_RTU_Manager *rtu_manager_; diff --git a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i index 83b979ab94d..844831376e0 100644 --- a/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/Event_Channel.i @@ -154,15 +154,15 @@ operator == (const RtecEventComm::Event &event1, const RtecEventComm::Event &event2) { // Check if the sources are equal. 0 is a wildcard. - if ((event1.source_ != 0) - && (event2.source_ != 0) - && (event1.source_ != event2.source_)) + if ((event1.header.source != 0) + && (event2.header.source != 0) + && (event1.header.source != event2.header.source)) return 0; // Check if the types are equal. ACE_ES_EVENT_ANY is a wildcard. - if ((event1.type_ != ACE_ES_EVENT_ANY) && - (event2.type_ != ACE_ES_EVENT_ANY) && - (event1.type_ != event2.type_)) + if ((event1.header.type != ACE_ES_EVENT_ANY) && + (event2.header.type != ACE_ES_EVENT_ANY) && + (event1.header.type != event2.header.type)) return 0; return 1; @@ -559,7 +559,7 @@ ACE_ES_Dependency_Iterator::parse (void) if (rt_info_ == 0) rt_info_ = rep_[x].rt_info; - switch (rep_[x].event.type_) + switch (rep_[x].event.header.type) { case ACE_ES_CONJUNCTION_DESIGNATOR: n_conjunctions_++; diff --git a/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp b/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp index d88c19c18cc..7e62bc12466 100644 --- a/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp +++ b/TAO/orbsvcs/orbsvcs/Event_Utilities.cpp @@ -23,10 +23,9 @@ ACE_ConsumerQOS_Factory::start_conjunction_group (void) { int l = qos_.dependencies.length (); qos_.dependencies.length (l + 1); - qos_.dependencies[l].event.type_ = ACE_ES_CONJUNCTION_DESIGNATOR; + qos_.dependencies[l].event.header.type = ACE_ES_CONJUNCTION_DESIGNATOR; qos_.dependencies[l].rt_info = 0; - // TODO: qos_.dependencies[l].event.data_.lval (0); - designator_set_ = 1; + this->designator_set_ = 1; return 0; } @@ -35,10 +34,9 @@ ACE_ConsumerQOS_Factory::start_disjunction_group (void) { int l = qos_.dependencies.length (); qos_.dependencies.length (l + 1); - qos_.dependencies[l].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR; + qos_.dependencies[l].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR; qos_.dependencies[l].rt_info = 0; - // TODO: qos_.dependencies[l].event.data_.lval (0); - designator_set_ = 1; + this->designator_set_ = 1; return 0; } @@ -52,16 +50,14 @@ ACE_ConsumerQOS_Factory::insert (const RtecEventChannelAdmin::Dependency &subscr int l = qos_.dependencies.length (); qos_.dependencies.length (l + 1); qos_.dependencies[l].rt_info = 0; - qos_.dependencies[l].event.type_ = ACE_ES_GLOBAL_DESIGNATOR; + qos_.dependencies[l].event.header.type = ACE_ES_GLOBAL_DESIGNATOR; - // TODO: IDL union qos_.dependencies[l].event.data_.lval (0); this->designator_set_ = 1; } int l = qos_.dependencies.length (); qos_.dependencies.length (l + 1); qos_.dependencies[l] = subscribe; - // TODO: IDL union qos_.dependencies[l].event.data_.lval (0); return 0; } @@ -72,8 +68,8 @@ void event_debug (const char* p, ACE_DEBUG ((LM_DEBUG, "%*.*s - event.source: %d\n" "%*.*s event.type: %d\n", - l, l, p, event.source_, - l, l, p, event.type_)); + l, l, p, event.header.source, + l, l, p, event.header.type)); } void @@ -107,8 +103,8 @@ ACE_SupplierQOS_Factory::insert (RtecEventComm::EventSourceID sid, { int l = qos_.publications.length (); qos_.publications.length (l + 1); - qos_.publications[l].event.source_ = sid; - qos_.publications[l].event.type_ = type; + qos_.publications[l].event.header.source = sid; + qos_.publications[l].event.header.type = type; // TODO: IDL union qos_.publications[l].event.data_.lval (0); qos_.publications[l].dependency_info.rt_info = rt_info; qos_.publications[l].dependency_info.number_of_calls = ncalls; diff --git a/TAO/orbsvcs/orbsvcs/Event_Utilities.i b/TAO/orbsvcs/orbsvcs/Event_Utilities.i index 58423e9af27..53910d1ac1c 100644 --- a/TAO/orbsvcs/orbsvcs/Event_Utilities.i +++ b/TAO/orbsvcs/orbsvcs/Event_Utilities.i @@ -8,11 +8,11 @@ ACE_ConsumerQOS_Factory::insert (RtecEventComm::EventSourceID source, RtecScheduler::handle_t rt_info) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.source_ = source; - dependency.event.type_ = type; - //dependency.event.creation_time_ = 0; - //dependency.event.ec_recv_time_ = 0; - //dependency.event.ec_send_time_ = 0; + dependency.event.header.source = source; + dependency.event.header.type = type; + //dependency.event.header.creation_time = 0; + //dependency.event.header.ec_recv_time = 0; + //dependency.event.header.ec_send_time = 0; dependency.rt_info = rt_info; return this->insert (dependency); } @@ -22,11 +22,11 @@ ACE_ConsumerQOS_Factory::insert_type (RtecEventComm::EventType type, RtecScheduler::handle_t rt_info) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.source_ = 0; - dependency.event.type_ = type; - //dependency.event.creation_time_ = 0; - //dependency.event.ec_recv_time_ = 0; - //dependency.event.ec_send_time_ = 0; + dependency.event.header.source = 0; + dependency.event.header.type = type; + //dependency.event.header.creation_time = 0; + //dependency.event.header.ec_recv_time = 0; + //dependency.event.header.ec_send_time = 0; dependency.rt_info = rt_info; return this->insert (dependency); } @@ -36,11 +36,11 @@ ACE_ConsumerQOS_Factory::insert_source (RtecEventComm::EventSourceID source, RtecScheduler::handle_t rt_info) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.source_ = source; - dependency.event.type_ = ACE_ES_EVENT_ANY; - //dependency.event.creation_time_ = 0; - //dependency.event.ec_recv_time_ = 0; - //dependency.event.ec_send_time_ = 0; + dependency.event.header.source = source; + dependency.event.header.type = ACE_ES_EVENT_ANY; + //dependency.event.header.creation_time = 0; + //dependency.event.header.ec_recv_time = 0; + //dependency.event.header.ec_send_time = 0; dependency.rt_info = rt_info; return this->insert (dependency); } @@ -51,11 +51,11 @@ ACE_ConsumerQOS_Factory::insert_time (RtecEventComm::EventType type, RtecScheduler::handle_t rt_info) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.source_ = 0; - dependency.event.type_ = type; - dependency.event.creation_time_ = interval; - //dependency.event.ec_recv_time_ = 0; - //dependency.event.ec_send_time_ = 0; + dependency.event.header.source = 0; + dependency.event.header.type = type; + dependency.event.header.creation_time = interval; + //dependency.event.header.ec_recv_time = 0; + //dependency.event.header.ec_send_time = 0; dependency.rt_info = rt_info; return this->insert (dependency); } @@ -64,12 +64,12 @@ ACE_INLINE int ACE_ConsumerQOS_Factory::insert_act (RtecEventComm::EventData act) { RtecEventChannelAdmin::Dependency dependency; - dependency.event.source_ = 0; - dependency.event.type_ = ACE_ES_EVENT_ACT; - //dependency.event.creation_time_ = 0; - //dependency.event.ec_recv_time_ = 0; - //dependency.event.ec_send_time_ = 0; - dependency.event.data_ = act; + dependency.event.header.source = 0; + dependency.event.header.type = ACE_ES_EVENT_ACT; + //dependency.event.header.creation_time = 0; + //dependency.event.header.ec_recv_time = 0; + //dependency.event.header.ec_send_time = 0; + dependency.event.data = act; return this->insert (dependency); } diff --git a/TAO/orbsvcs/orbsvcs/Makefile b/TAO/orbsvcs/orbsvcs/Makefile index 7ec933dfaef..d1edaf72a6b 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile +++ b/TAO/orbsvcs/orbsvcs/Makefile @@ -37,6 +37,7 @@ IDL_SRCS= \ RtecEventComm \ RtecScheduler \ RtecEventChannelAdmin \ + RtecUDPAdmin \ LifeCycleService \ CosTrading \ AVStreams \ @@ -69,6 +70,7 @@ FILES= $(IDL_FILES) \ Event/Task_Manager \ Event/EC_Gateway \ Event/EC_Gateway_UDP \ + Event/EC_UDP_Admin \ Event/Module_Factory \ \ Sched/Config_Scheduler \ @@ -138,6 +140,7 @@ IDL_EXT=C.h C.i C.cpp S.h S.i S.cpp S_T.h S_T.i S_T.cpp # # Extra dependencies not catched by make depend. # +$(foreach ext, $(IDL_EXT), RtecUDPAdmin(ext)): RtecEventComm.idl $(foreach ext, $(IDL_EXT), RtecScheduler$(ext)): CosTimeBase.idl $(foreach ext, $(IDL_EXT), RtecEventComm$(ext)): CosTimeBase.idl $(foreach ext, $(IDL_EXT), RtecEventChannelAdmin$(ext)): CosTimeBase.idl diff --git a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl index 081c48ef498..a47ffc32cce 100644 --- a/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl +++ b/TAO/orbsvcs/orbsvcs/RtecEventChannelAdmin.idl @@ -31,30 +31,109 @@ module RtecEventChannelAdmin { boolean is_gateway; }; - interface ProxyPushConsumer: RtecEventComm::PushConsumer { - void connect_push_supplier( - in RtecEventComm::PushSupplier push_supplier, - in SupplierQOS qos) raises(AlreadyConnected); - }; - interface ProxyPushSupplier: RtecEventComm::PushSupplier { + // = TITLE + // The Proxy Supplier + // + // = DESCRIPTION + // Consumers receive their events from objects of this type. See + // the interfaces below to see how to gain access to an object + // reference of this type. + void connect_push_consumer( in RtecEventComm::PushConsumer push_consumer, - in ConsumerQOS qos) raises(AlreadyConnected, TypeError); + in ConsumerQOS qos) + raises(AlreadyConnected, TypeError); + // Before receiving any events the consumer must provide its + // publication list and QoS information to the Event Channel + // through this method. + void suspend_connection (); + // Temporarly suspend reception of events from the Event + // Channel. Calling this method is more efficient than dropping + // them on the receiving end and less expensive than disconnecting + // and connecting again (but it is not free!!) + void resume_connection (); + // Resume the reception of events. + }; + + interface ProxyPushConsumer: RtecEventComm::PushConsumer { + // = TITLE + // The Proxy Consumer + // + // = DESCRIPTION + // Suppliers push their events to objects of this type. See the + // interfaces below to see how to gain access to an object + // reference of this type. + + void connect_push_supplier( + in RtecEventComm::PushSupplier push_supplier, + in SupplierQOS qos) + raises(AlreadyConnected); + // Before pushing events the supplier must provide its publication + // list and QoS information to the Event Channel through this + // method. }; - // TODO: Find out the exception specs for the following interface's + // @@ TODO: Find out the exception specs for the following interface's // methods. interface ConsumerAdmin { + // = TITLE + // The Supplier factory + // + // = DESCRIPTION + // Consumers use this interface to create suppliers they can + // connect to. + ProxyPushSupplier obtain_push_supplier(); + // Obtain a supplier }; interface SupplierAdmin { + // = TITLE + // The Consumer factory + // + // = DESCRIPTION + // Suppliers use this interface to create consumers they can + // connect to. + ProxyPushConsumer obtain_push_consumer(); + // Obtain a consumer }; + interface Observer { + // = TITLE + // Observes any changes in the consumer or supplier sets for an + // Event Channel + // + // = DESCRIPTION + // This object receives updates from Event Channels with any + // changes on set of consumer and or suppliers registered with + // the Event Channel. + + void update_consumer (in ConsumerQOS sub); + // A change in the list of consumers has ocurred. The disjunction + // of the subscriptions (and its equivalent form ) is + // passed to the observer. + + void update_supplier (in SupplierQOS pub); + // A change in the list of consumers has ocurred. The disjunction + // of the publications (and its equivalent form for suppliers). + }; + + typedef unsigned long long Observer_Handle; + // This is used as an opaque ID to control the addition and removal + // of handles from an event channel. + interface EventChannel { + // = TITLE + // The Event Channel class + // + // = DESCRIPTION + // This class provides the main entry point for the Event + // Channel. The class follows a protocol similar to the + // COS Event Service as described in the CORBAservices spec. + // exception SYNCHRONIZATION_ERROR {}; exception QOS_ERROR {}; exception SUBSCRIPTION_ERROR {}; @@ -62,11 +141,24 @@ module RtecEventChannelAdmin { exception DISPATCH_ERROR {}; ConsumerAdmin for_consumers(); + // Consumers call this method to gain access to the + // ProxyPushSupplier factory. + SupplierAdmin for_suppliers(); + // Suppliers call this method to gain access to the + // ProxyPushConsumer factory. void destroy (); - }; - -}; + // This method shutdown the Event Channel, destroy any resources + // for it and actually shutdown the server where the Event Channel + // is running. + Observer_Handle append_observer (in Observer gw); + // Add a gateway to the Event Channel, the handle returned must be + // used to remove the gateway from the ORB. + void remove_observer (in Observer_Handle gw); + // Remove the observer. + // @@ TODO: We should raise something if the handle is invalid. + }; +}; diff --git a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl index 25f39626f52..3a939249656 100644 --- a/TAO/orbsvcs/orbsvcs/RtecEventComm.idl +++ b/TAO/orbsvcs/orbsvcs/RtecEventComm.idl @@ -18,14 +18,6 @@ module RtecEventComm { // Users willing to implement their own marshalling may use a // sequence of octet. -#if 0 - union EventData switch(long) { - case 1: double dval; - case 2: string sval; - case 3: sequence<octet> bval; - default: long lval; - }; -#else typedef sequence<octet> EventPayload; struct EventData { long x; @@ -44,45 +36,54 @@ module RtecEventComm { any any_value; #endif /* TAO_LACKS_EVENT_CHANNEL_ANY */ }; -#endif typedef TimeBase::TimeT Time; typedef long EventSourceID; typedef long EventType; - struct Event + struct EventHeader { // = TITLE - // The Event structure. + // The Event Header // // = DESCRIPTION - // Events are represented by this structure, it is simply a - // header,data pair. - // - - EventType type_; + // Each event carries some information to do filtering, + // correlation, etc. + EventType type; // The event type. // This may be different from the discriminator in the EventData // union above, the motivation is to allow filtering by data // contents: different event types are assigned to different data // contents though they use the same discriminator. - EventSourceID source_; + EventSourceID source; // Some way to identify the supplier. - long ttl_; + long ttl; // The "Time To Live" count, each time an EC process the event it // decreases the TTL field, when it gets to zero the message is no // longer forwarded. - Time creation_time_; - Time ec_recv_time_; - Time ec_send_time_; + Time creation_time; + Time ec_recv_time; + Time ec_send_time; // Some timestamps, they actually belong in the payload, for some // kind of measument event. + }; + + struct Event + { + // = TITLE + // The Event structure. + // + // = DESCRIPTION + // Events are represented by this structure, it is simply a + // header,data pair. + // + EventHeader header; - EventData data_; + EventData data; // The event payload. }; typedef sequence<Event> EventSet; diff --git a/TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl b/TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl new file mode 100644 index 00000000000..6f5d043d5c5 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/RtecUDPAdmin.idl @@ -0,0 +1,28 @@ +// +// $Id$ +// +#include "RtecEventComm.idl" + +module RtecUDPAdmin { + // = TITLE + // Multicast Administration module + // + // = DESCRIPTION + // When the EC is used as an interface to multicast communication + // a mapping between event types and multicast addresses must be + // stablished. + + struct UDP_Addr { + unsigned long ipaddr; + unsigned short port; + }; + + interface AddrServer { + void get_addr (in RtecEventComm::EventHeader header, + out UDP_Addr addr); + // Get the addr and port given the event header. + }; + +}; + + diff --git a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp index e2d60d79670..7119810e965 100644 --- a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp +++ b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Consumer.cpp @@ -214,7 +214,7 @@ Driver::push_consumer (void* consumer_cookie, { const RtecEventComm::Event& e = events[i]; - if (e.data_.payload.mb () == 0) + if (e.data.payload.mb () == 0) { ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i)); continue; @@ -228,10 +228,10 @@ Driver::push_consumer (void* consumer_cookie, // already!)? // Note that there is no copying - int byte_order = e.data_.payload[0]; + int byte_order = e.data.payload[0]; ACE_Message_Block* mb = - ACE_Message_Block::duplicate (e.data_.payload.mb ()); + ACE_Message_Block::duplicate (e.data.payload.mb ()); mb->rd_ptr (1); // skip the byte order TAO_InputCDR cdr (mb, byte_order); diff --git a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp index 2c1f21924df..1dfd8b4bd6d 100644 --- a/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp +++ b/TAO/orbsvcs/tests/EC_Custom_Marshal/ECM_Supplier.cpp @@ -246,27 +246,27 @@ ECMS_Driver::supplier_task (Test_Supplier *supplier, { RtecEventComm::EventSet event (1); event.length (1); - event[0].source_ = supplier->supplier_id (); - event[0].ttl_ = 1; + event[0].header.source = supplier->supplier_id (); + event[0].header.ttl = 1; ACE_hrtime_t t = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (event[0].creation_time_, t); - event[0].ec_recv_time_ = ORBSVCS_Time::zero; - event[0].ec_send_time_ = ORBSVCS_Time::zero; + ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t); + event[0].header.ec_recv_time = ORBSVCS_Time::zero; + event[0].header.ec_send_time = ORBSVCS_Time::zero; if (i == ACE_static_cast (CORBA::Long, this->event_count_) - 1) - event[0].type_ = ACE_ES_EVENT_SHUTDOWN; + event[0].header.type = ACE_ES_EVENT_SHUTDOWN; else if (i % 2 == 0) - event[0].type_ = this->event_a_; + event[0].header.type = this->event_a_; else - event[0].type_ = this->event_b_; + event[0].header.type = this->event_b_; - event[0].data_.x = 0; - event[0].data_.y = 0; + event[0].data.x = 0; + event[0].data.y = 0; // We use replace to minimize the copies, this should result // in just one memory allocation; - event[0].data_.payload.replace (mblen, mb); + event[0].data.payload.replace (mblen, mb); supplier->consumer_proxy ()->push(event, TAO_TRY_ENV); TAO_CHECK_ENV; diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp index ab30bcbbfc5..160e8dcb7c3 100644 --- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp @@ -3,6 +3,7 @@ #include "ace/Get_Opt.h" #include "ace/Auto_Ptr.h" #include "ace/Sched_Params.h" +#include "ace/Read_Buffer.h" #include "orbsvcs/Event_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" @@ -14,48 +15,34 @@ #include "orbsvcs/Event/Event_Channel.h" #include "EC_Mcast.h" -ACE_RCSID(EC_Mcast, EC_Mcast, "$Id$") +#if !defined (__ACE_INLINE__) +#include "EC_Mcast.i" +#endif /* __ACE_INLINE__ */ -#define ECM_DEFAULT_SEND_MCAST_GROUP "224.9.9.1" -#define ECM_DEFAULT_RECV_MCAST_GROUP "224.9.9.2" +ACE_RCSID(EC_Mcast, EC_Mcast, "$Id$") ECM_Driver::ECM_Driver (void) : lcl_name_ ("ECM"), - short_circuit_ (0), - n_suppliers_ (1), - n_consumers_ (1), - workload_ (10), event_period_ (25000), - event_count_ (200), - s_event_a_ (ACE_ES_EVENT_UNDEFINED), - s_event_b_ (ACE_ES_EVENT_UNDEFINED + 1), - c_event_a_ (ACE_ES_EVENT_UNDEFINED), - c_event_b_ (ACE_ES_EVENT_UNDEFINED + 1), - r_event_a_ (ACE_ES_EVENT_UNDEFINED), - r_event_b_ (ACE_ES_EVENT_UNDEFINED + 1), - schedule_file_ (0), - pid_file_name_ (0), - send_mcast_group_ (u_short(23000), ECM_DEFAULT_SEND_MCAST_GROUP), - recv_mcast_group_ (u_short(23001), ECM_DEFAULT_RECV_MCAST_GROUP), - mcast_eh_ (&receiver_), - ready_ (0), - ready_cnd_ (ready_mtx_) + event_count_ (100), + config_filename_ (0), + pid_filename_ (0), + local_federations_count_ (0), + all_federations_count_ (0) { } - - int ECM_Driver::run (int argc, char* argv[]) { TAO_TRY { - CORBA::ORB_var orb = + this->orb_ = CORBA::ORB_init (argc, argv, "", TAO_TRY_ENV); TAO_CHECK_ENV; CORBA::Object_var poa_object = - orb->resolve_initial_references("RootPOA"); + this->orb_->resolve_initial_references("RootPOA"); if (CORBA::is_nil (poa_object.in ())) ACE_ERROR_RETURN ((LM_ERROR, " (%P|%t) Unable to initialize the POA.\n"), @@ -72,44 +59,62 @@ ECM_Driver::run (int argc, char* argv[]) if (this->parse_args (argc, argv)) return 1; + if (this->parse_config_file ()) + return 1; + ACE_DEBUG ((LM_DEBUG, "Execution parameters:\n" " lcl name = <%s>\n" - " short circuit = <%d>\n" - " suppliers = <%d>\n" - " consumers = <%d>\n" - " workload = <%d> (iterations)\n" " event period = <%d> (usecs)\n" " event count = <%d>\n" - " supplier Event A = <%d>\n" - " supplier Event B = <%d>\n" - " consumer Event A = <%d>\n" - " consumer Event B = <%d>\n" - " remote Event A = <%d>\n" - " remote Event B = <%d>\n" - " schedule_file = <%s>\n" + " config file name = <%s>\n" " pid file name = <%s>\n", this->lcl_name_?this->lcl_name_:"nil", - this->short_circuit_, - this->n_suppliers_, - this->n_consumers_, - this->workload_, this->event_period_, this->event_count_, - this->s_event_a_, - this->s_event_b_, - this->c_event_a_, - this->c_event_b_, - this->r_event_a_, - this->r_event_b_, - - this->schedule_file_?this->schedule_file_:"nil", - this->pid_file_name_?this->pid_file_name_:"nil") ); - if (this->pid_file_name_ != 0) + this->config_filename_?this->config_filename_:"nil", + this->pid_filename_?this->pid_filename_:"nil") ); + + int i; + for (i = 0; i < this->local_federations_count_; ++i) + { + ACE_DEBUG ((LM_DEBUG, + " name = <%s>\n" + " port = <%d>\n" + " supplier types:\n", + this->local_federations_[i]->name ()?this->local_federations_[i]->name ():"nil", + this->local_federations_[i]->mcast_port ())); + int j; + for (j = 0; + j < this->local_federations_[i]->supplier_types (); + ++j) + { + + ACE_DEBUG ((LM_DEBUG, + " name = <%s>\n" + " ipadd = <%x>\n", + this->local_federations_[i]->supplier_name (j), + this->local_federations_[i]->supplier_ipaddr (j))); + } + ACE_DEBUG ((LM_DEBUG, + " consumer types:\n")); + for (j = 0; + j < this->local_federations_[i]->consumer_types (); + ++j) + { + ACE_DEBUG ((LM_DEBUG, + " name = <%s>\n" + " ipadd = <%x>\n", + this->local_federations_[i]->consumer_name (j), + this->local_federations_[i]->consumer_ipaddr (j))); + } + } + + if (this->pid_filename_ != 0) { - FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w"); + FILE* pid = ACE_OS::fopen (this->pid_filename_, "w"); if (pid != 0) { ACE_OS::fprintf (pid, "%d\n", ACE_OS::getpid ()); @@ -117,6 +122,7 @@ ECM_Driver::run (int argc, char* argv[]) } } +#if 0 int min_priority = ACE_Sched_Params::priority_min (ACE_SCHED_FIFO); // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. @@ -136,11 +142,12 @@ ECM_Driver::run (int argc, char* argv[]) if (ACE_OS::thr_setprio (min_priority) == -1) { - ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed\n")); + ACE_DEBUG ((LM_DEBUG, "(%P|%t) main thr_setprio failed\n")); } +#endif /* 0 */ CORBA::Object_var naming_obj = - orb->resolve_initial_references ("NameService"); + this->orb_->resolve_initial_references ("NameService"); if (CORBA::is_nil (naming_obj.in ())) ACE_ERROR_RETURN ((LM_ERROR, " (%P|%t) Unable to get the Naming Service.\n"), @@ -161,7 +168,7 @@ ECM_Driver::run (int argc, char* argv[]) char buf[bufsize]; CORBA::String_var str = - orb->object_to_string (scheduler.in (), TAO_TRY_ENV); + this->orb_->object_to_string (scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n", str.in ())); @@ -189,7 +196,7 @@ ECM_Driver::run (int argc, char* argv[]) ec_impl._this (TAO_TRY_ENV); TAO_CHECK_ENV; - str = orb->object_to_string (ec.in (), TAO_TRY_ENV); + str = this->orb_->object_to_string (ec.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ())); @@ -218,33 +225,33 @@ ECM_Driver::run (int argc, char* argv[]) ACE_DEBUG ((LM_DEBUG, "located local EC\n")); - this->connect_ecg (local_ec.in (), TAO_TRY_ENV); + this->open_federations (local_ec.in (), + scheduler.in (), + TAO_TRY_ENV); TAO_CHECK_ENV; - this->connect_suppliers (local_ec.in (), TAO_TRY_ENV); - TAO_CHECK_ENV; - - ACE_DEBUG ((LM_DEBUG, "connected supplier\n")); + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_federations done\n")); - this->connect_consumers (local_ec.in (), TAO_TRY_ENV); + this->open_senders (local_ec.in (), + scheduler.in (), + TAO_TRY_ENV); TAO_CHECK_ENV; - ACE_DEBUG ((LM_DEBUG, "connected consumer\n")); + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_senders done\n")); - this->activate_suppliers (local_ec.in (), TAO_TRY_ENV); + this->open_receivers (local_ec.in (), + scheduler.in (), + TAO_TRY_ENV); TAO_CHECK_ENV; - ACE_DEBUG ((LM_DEBUG, "suppliers are active\n")); + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_receivers done\n")); - this->running_suppliers_ = this->n_suppliers_; + this->activate_federations (local_ec.in (), + scheduler.in (), + TAO_TRY_ENV); + TAO_CHECK_ENV; - // Acquire the mutex for the ready mutex, blocking any supplier - // that may start after this point. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ready_mon, this->ready_mtx_, 1); - this->ready_ = 1; - this->test_start_ = ACE_OS::gethrtime (); - ready_mon.release (); - this->ready_cnd_.broadcast (); + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: activate_federations done\n")); ACE_DEBUG ((LM_DEBUG, "activate the EC\n")); @@ -252,34 +259,25 @@ ECM_Driver::run (int argc, char* argv[]) ec_impl.activate (); ACE_DEBUG ((LM_DEBUG, "running the test\n")); - if (orb->run () == -1) + if (this->orb_->run () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); - this->test_stop_ = ACE_OS::gethrtime (); - ACE_DEBUG ((LM_DEBUG, "shutdown the EC\n")); ec_impl.shutdown (); this->dump_results (); - this->receiver_.close (TAO_TRY_ENV); - TAO_CHECK_ENV; - this->receiver_.shutdown (TAO_TRY_ENV); - TAO_CHECK_ENV; - - this->sender_.close (TAO_TRY_ENV); + this->close_receivers (TAO_TRY_ENV); TAO_CHECK_ENV; - this->sender_.shutdown (TAO_TRY_ENV); + this->close_senders (TAO_TRY_ENV); TAO_CHECK_ENV; - this->disconnect_consumers (TAO_TRY_ENV); - TAO_CHECK_ENV; - this->disconnect_suppliers (TAO_TRY_ENV); + this->close_federations (TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n")); tv.set (5, 0); - if (orb->run (&tv) == -1) + if (this->orb_->run (&tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); naming_context->unbind (schedule_name, TAO_TRY_ENV); @@ -287,45 +285,6 @@ ECM_Driver::run (int argc, char* argv[]) naming_context->unbind (channel_name, TAO_TRY_ENV); TAO_CHECK_ENV; - - if (this->schedule_file_ != 0) - { - RtecScheduler::RT_Info_Set_var infos; - RtecScheduler::Config_Info_Set_var configs; - -#if defined (__SUNPRO_CC) - // Sun C++ 4.2 warns with the code below: - // Warning (Anachronism): Temporary used for non-const - // reference, now obsolete. - // Note: Type "CC -migration" for more on anachronisms. - // Warning (Anachronism): The copy constructor for argument - // infos of type RtecScheduler::RT_Info_Set_out should take - // const RtecScheduler::RT_Info_Set_out&. - // But, this code is not CORBA conformant, because users should - // not define instances of _out types. - - RtecScheduler::RT_Info_Set_out infos_out (infos); - RtecScheduler::Config_Info_Set_out configs_out (configs); - ACE_Scheduler_Factory::server ()->compute_scheduling - (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, - ACE_SCOPE_THREAD), - ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, - ACE_SCOPE_THREAD), - infos_out, configs_out, TAO_TRY_ENV); -#else /* ! __SUNPRO_CC */ - ACE_Scheduler_Factory::server ()->compute_scheduling - (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, - ACE_SCOPE_THREAD), - ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, - ACE_SCOPE_THREAD), - infos.out (), configs.out (), TAO_TRY_ENV); -#endif /* ! __SUNPRO_CC */ - - TAO_CHECK_ENV; - ACE_Scheduler_Factory::dump_schedule (infos.in (), - configs.in (), - this->schedule_file_); - } } TAO_CATCH (CORBA::SystemException, sys_ex) { @@ -340,510 +299,177 @@ ECM_Driver::run (int argc, char* argv[]) } void -ECM_Driver::disconnect_suppliers (CORBA::Environment &_env) +ECM_Driver::federation_has_shutdown (ECM_Local_Federation *federation, + CORBA::Environment &) { - for (int i = 0; i < this->n_suppliers_; ++i) - { - this->suppliers_[i]->close (_env); - if (_env.exception () != 0) return; - } + ACE_DEBUG ((LM_DEBUG, "Federation <%s> shuting down\n", + federation->name ())); + this->federations_running_--; + if (this->federations_running_ <= 0) + this->orb_->shutdown (); } void -ECM_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env) +ECM_Driver::open_federations (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - TAO_TRY + for (int i = 0; i < this->local_federations_count_; ++i) { - for (int i = 0; i < this->n_suppliers_; ++i) - { - // Limit the number of events sent by each supplier - int mc = this->event_count_ / this->n_suppliers_; - if (mc == 0) - mc = 1; - - char buf[BUFSIZ]; - ACE_OS::sprintf (buf, "supplier_%02.2d@%s", i, this->lcl_name_); - - ACE_NEW (this->suppliers_[i], - ECM_Supplier (this, this->suppliers_ + i)); - - this->suppliers_[i]->open (buf, - this->s_event_a_, - this->s_event_b_, - mc, - this->event_period_ * 10, - local_ec, - TAO_TRY_ENV); - TAO_CHECK_ENV; - } - - } - TAO_CATCHANY - { - TAO_RETHROW; + this->local_federations_[i]->open (this->event_count_, + this->event_period_, + ec, scheduler, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } - TAO_ENDTRY; } void -ECM_Driver::disconnect_consumers (CORBA::Environment &_env) +ECM_Driver::activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - for (int i = 0; i < this->n_consumers_; ++i) + this->federations_running_ = this->local_federations_count_; + for (int i = 0; i < this->local_federations_count_; ++i) { - this->consumers_[i]->close (_env); - if (_env.exception () != 0) return; + this->local_federations_[i]->activate (this->event_period_, + ec, scheduler, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } } void -ECM_Driver::activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env) +ECM_Driver::close_federations (CORBA::Environment &_env) { - TAO_TRY + for (int i = 0; i < this->local_federations_count_; ++i) { - for (int i = 0; i < this->n_suppliers_; ++i) - { - // Limit the number of events sent by each supplier - int mc = this->event_count_ / this->n_suppliers_; - if (mc == 0) - mc = 1; - - char buf[BUFSIZ]; - ACE_OS::sprintf (buf, "supplier_%02.2d@%s", i, this->lcl_name_); - - this->suppliers_[i]->activate (buf, - this->event_period_ * 10, - local_ec, - TAO_TRY_ENV); - TAO_CHECK_ENV; - } - + this->local_federations_[i]->close (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); } - TAO_CATCHANY - { - TAO_RETHROW; - } - TAO_ENDTRY; } void -ECM_Driver::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env) +ECM_Driver::open_senders (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - TAO_TRY + if (this->send_dgram_.open (ACE_Addr::sap_any) == -1) { - for (int i = 0; i < this->n_consumers_; ++i) - { - char buf[BUFSIZ]; - ACE_OS::sprintf (buf, "consumer_%02.2d@%s", i, this->lcl_name_); - - ACE_NEW (this->consumers_[i], - ECM_Consumer (this, this->consumers_ + i)); - - this->consumers_[i]->open (buf, - this->c_event_a_, - this->c_event_b_, - local_ec, - TAO_TRY_ENV); - TAO_CHECK_ENV; - this->stats_[i].total_time_ = 0; - this->stats_[i].lcl_count_ = 0; - this->stats_[i].rmt_count_ = 0; - } - - this->running_consumers_ = this->n_consumers_; + // @@ TODO throw an application specific exception. + _env.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); + return; } - TAO_CATCHANY + for (int i = 0; i < this->all_federations_count_; ++i) { - TAO_RETHROW; + this->all_federations_[i]->open (&this->send_dgram_, + ec, + scheduler, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } - TAO_ENDTRY; } void -ECM_Driver::connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env) +ECM_Driver::close_senders (CORBA::Environment &_env) { - TAO_TRY + for (int i = 0; i < this->all_federations_count_; ++i) { - RtecScheduler::Scheduler_ptr local_sch = - ACE_Scheduler_Factory::server (); - - // We could use the same name on the local and remote scheduler, - // but that fails when using a global scheduler. - const int bufsize = 512; - - char mcast_name[bufsize]; - ACE_OS::strcpy (mcast_name, "sender"); - ACE_OS::strcat (mcast_name, "@"); - ACE_OS::strcat (mcast_name, this->lcl_name_); - - this->sender_.init (local_ec, - local_sch, - mcast_name, - this->send_mcast_group_, - TAO_TRY_ENV); - TAO_CHECK_ENV; - - ACE_INET_Addr ignore_from; - this->sender_.get_local_addr (ignore_from); - - char recv_name[bufsize]; - ACE_OS::strcpy (recv_name, "receiver"); - ACE_OS::strcat (recv_name, "@"); - ACE_OS::strcat (recv_name, this->lcl_name_); - - this->receiver_.init (local_ec, - local_sch, - recv_name, - ignore_from, - TAO_TRY_ENV); - TAO_CHECK_ENV; - - this->mcast_eh_.reactor (TAO_ORB_Core_instance ()->reactor ()); - this->mcast_eh_.open (this->recv_mcast_group_); - - RtecEventChannelAdmin::ConsumerQOS sender_qos; - sender_qos.is_gateway = 1; - sender_qos.dependencies.length (3); - sender_qos.dependencies[0].event.type_ = ACE_ES_DISJUNCTION_DESIGNATOR; - sender_qos.dependencies[0].event.source_ = 0; - sender_qos.dependencies[0].event.creation_time_ = ORBSVCS_Time::zero; - sender_qos.dependencies[0].rt_info = 0; - sender_qos.dependencies[1].event.type_ = this->s_event_a_; - sender_qos.dependencies[1].event.source_ = 0; - sender_qos.dependencies[1].event.creation_time_ = ORBSVCS_Time::zero; - sender_qos.dependencies[1].rt_info = 0; - sender_qos.dependencies[2].event.type_ = this->s_event_b_; - sender_qos.dependencies[2].event.source_ = 0; - sender_qos.dependencies[2].event.creation_time_ = ORBSVCS_Time::zero; - sender_qos.dependencies[2].rt_info = 0; - - this->sender_.open (sender_qos, TAO_TRY_ENV); - TAO_CHECK_ENV; - - RtecEventChannelAdmin::SupplierQOS receiver_qos; - receiver_qos.is_gateway = 1; - receiver_qos.publications.length (2); - receiver_qos.publications[0].event.type_ = this->r_event_a_; - receiver_qos.publications[0].event.source_ = 0; - receiver_qos.publications[0].event.creation_time_ = ORBSVCS_Time::zero; - receiver_qos.publications[0].dependency_info.dependency_type = - RtecScheduler::TWO_WAY_CALL; - receiver_qos.publications[0].dependency_info.number_of_calls = 1; - receiver_qos.publications[0].dependency_info.rt_info = 0; - receiver_qos.publications[1].event.type_ = this->r_event_b_; - receiver_qos.publications[1].event.source_ = 0; - receiver_qos.publications[1].event.creation_time_ = ORBSVCS_Time::zero; - receiver_qos.publications[1].dependency_info.dependency_type = - RtecScheduler::TWO_WAY_CALL; - receiver_qos.publications[1].dependency_info.number_of_calls = 1; - receiver_qos.publications[1].dependency_info.rt_info = 0; - - this->receiver_.open (receiver_qos, TAO_TRY_ENV); - TAO_CHECK_ENV; - } - TAO_CATCHANY - { - TAO_RETHROW; + this->all_federations_[i]->close (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); } - TAO_ENDTRY; + this->send_dgram_.close (); } void -ECM_Driver::push_supplier (void * /* cookie */, - RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, - const RtecEventComm::EventSet &events, - CORBA::Environment & _env) +ECM_Driver::open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - this->wait_until_ready (); - // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events sent by supplier\n")); - // @@ TODO we could keep somekind of stats here... - if (!this->short_circuit_) + for (int i = 0; i < this->local_federations_count_; ++i) { - consumer->push (events, _env); - } - else - { - for (int i = 0; i < this->n_consumers_ && !_env.exception (); ++i) - { - this->consumers_[i]->push (events, _env); - } + this->local_federations_[i]->open_receiver (ec, + scheduler, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } } void -ECM_Driver::push_consumer (void *consumer_cookie, - ACE_hrtime_t arrival, - const RtecEventComm::EventSet &events, - CORBA::Environment &) +ECM_Driver::close_receivers (CORBA::Environment &_env) { - int ID = - (ACE_reinterpret_cast(ECM_Consumer**,consumer_cookie) - - this->consumers_); - - // ACE_DEBUG ((LM_DEBUG, "(%P|%t) events received by consumer %d\n", ID)); - - if (events.length () == 0) + for (int i = 0; i < this->local_federations_count_; ++i) { - // ACE_DEBUG ((LM_DEBUG, "no events\n")); - return; + this->local_federations_[i]->close_receiver (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); } - - // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ())); - -#if 0 - const int bufsize = 128; - char buf[bufsize]; - ACE_OS::sprintf (buf, "Consumer %d receives event in thread: ", ID); -#endif - - for (u_int i = 0; i < events.length (); ++i) - { - const RtecEventComm::Event& e = events[i]; - - if (e.type_ == ACE_ES_EVENT_SHUTDOWN) - { - this->shutdown_consumer (ID); - continue; - } - - ACE_hrtime_t s; - ORBSVCS_Time::TimeT_to_hrtime (s, e.creation_time_); - ACE_hrtime_t nsec = arrival - s; - if (this->local_source (e.source_)) - { - int& count = this->stats_[ID].lcl_count_; - - this->stats_[ID].lcl_latency_[count] = nsec; - int workload = this->workload_; - int interval = this->event_period_; - - for (int j = 0; j < workload; ++j) - { - // Eat a little CPU so the Utilization test can measure the - // consumed time.... - /* takes about 40.2 usecs on a 167 MHz Ultra2 */ - u_long n = 1279UL; - ACE::is_prime (n, 2, n / 2); - } - // Increment the elapsed time on this consumer. - ACE_hrtime_t now = ACE_OS::gethrtime (); - this->stats_[ID].total_time_ += now - arrival; - this->stats_[ID].end_[count] = now; - - // We estimate our laxity based on the event creation - // time... it may not be very precise, but will do; other - // strategies include: - // + Keep track of the "current frame", then then deadline - // is the end of the frame. - // + Use the start of the test to keep the current frame. - // + Use the last execution. - - // Work around MSVC++ bug, it does not not how to convert an - // unsigned 64 bit int into a long.... - CORBA::ULong tmp = ACE_static_cast(CORBA::ULong,(s - now)); - this->stats_[ID].laxity_[count] = 1 + tmp/1000.0F/interval; - count++; - } - else - { - int& count = this->stats_[ID].rmt_count_; - this->stats_[ID].rmt_latency_[count] = nsec; - count++; - } - } -} - -void -ECM_Driver::wait_until_ready (void) -{ - ACE_GUARD (ACE_Thread_Mutex, ready_mon, this->ready_mtx_); - while (!this->ready_) - this->ready_cnd_.wait (); -} - -void -ECM_Driver::shutdown_supplier (void* /* supplier_cookie */, - RtecEventComm::PushConsumer_ptr consumer, - CORBA::Environment& _env) -{ - - this->running_suppliers_--; - if (this->running_suppliers_ == 0) - { - // We propagate a shutdown event through the system... - RtecEventComm::EventSet shutdown (1); - shutdown.length (1); - RtecEventComm::Event& s = shutdown[0]; - - s.source_ = 0; - s.ttl_ = 1; - - ACE_hrtime_t t = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t); - s.ec_recv_time_ = ORBSVCS_Time::zero; - s.ec_send_time_ = ORBSVCS_Time::zero; - s.data_.x = 0; - s.data_.y = 0; - s.type_ = ACE_ES_EVENT_SHUTDOWN; - consumer->push (shutdown, _env); - } -} - -void -ECM_Driver::shutdown_consumer (int id) -{ - ACE_DEBUG ((LM_DEBUG, "Shutdown consumer %d\n", id)); - this->running_consumers_--; - if (this->running_consumers_ == 0) - TAO_ORB_Core_instance ()->orb ()->shutdown (); -} - -int -ECM_Driver::shutdown (CORBA::Environment& _env) -{ - ACE_UNUSED_ARG (_env); - - ACE_DEBUG ((LM_DEBUG, "Shutting down the multiple EC test\n")); - - TAO_ORB_Core_instance ()->orb ()->shutdown (); - return 0; } void ECM_Driver::dump_results (void) { - const int bufsize = 512; - char buf[bufsize]; - - for (int i = 0; i < this->n_consumers_; ++i) + for (int i = 0; i < this->local_federations_count_; ++i) { - ACE_OS::sprintf (buf, "CO%02.2d", i); - this->dump_results (buf, this->stats_[i]); + this->local_federations_[i]->dump_results (); } - // the cast is to workaround a msvc++ bug... - CORBA::ULong tmp = ACE_static_cast(CORBA::ULong, - this->test_stop_ - this->test_start_); - double usec = tmp / 1000.0; - ACE_DEBUG ((LM_DEBUG, "Time[TOTAL]: %.3f\n", usec)); } -void -ECM_Driver::dump_results (const char* name, Stats& stats) -{ - // @@ We are reporting the information without specifics about - // the cast is to workaround a msvc++ bug... - double usec = ACE_static_cast(CORBA::ULong,stats.total_time_) / 1000.0; - ACE_DEBUG ((LM_DEBUG, "Time[LCL,%s]: %.3f\n", name, usec)); - int i; - for (i = 1; i < stats.lcl_count_ - 1; ++i) - { - // the cast is to workaround a msvc++ bug... - usec = ACE_static_cast(CORBA::ULong,stats.lcl_latency_[i]) / 1000.0; - ACE_DEBUG ((LM_DEBUG, "Latency[LCL,%s]: %.3f\n", name, usec)); - - double percent = stats.laxity_[i] * 100.0; - ACE_DEBUG ((LM_DEBUG, "Laxity[LCL,%s]: %.3f\n", name, percent)); - - // the cast is to workaround a msvc++ bug... - usec = ACE_static_cast(CORBA::ULong,stats.end_[i] - this->test_start_) / 1000.0; - ACE_DEBUG ((LM_DEBUG, "Completion[LCL,%s]: %.3f\n", name, usec)); - } - for (i = 1; i < stats.rmt_count_ - 1; ++i) - { - // the cast is to workaround a msvc++ bug... - double usec = ACE_static_cast(CORBA::ULong,stats.rmt_latency_[i]) / 1000.0; - ACE_DEBUG ((LM_DEBUG, "Latency[RMT,%s]: %.3f\n", name, usec)); - } -} - -int -ECM_Driver::local_source (RtecEventComm::EventSourceID id) const -{ - for (int i = 0; i < this->n_suppliers_; ++i) - { - if (this->suppliers_[i]->supplier_id () == id) - return 1; - } - return 0; -} + +// **************************************************************** int ECM_Driver::parse_args (int argc, char *argv []) { - ACE_Get_Opt get_opt (argc, argv, "xl:s:r:h:p:d:"); + ACE_Get_Opt get_opt (argc, argv, "l:p:c:n:t:f:"); int opt; while ((opt = get_opt ()) != EOF) { switch (opt) { - case 'x': - this->short_circuit_ = 1; - break; - case 'l': this->lcl_name_ = get_opt.optarg; break; - case 'h': - { - char* aux; - char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); - - this->n_suppliers_ = ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->n_consumers_ = ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->workload_ = ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->event_period_ = ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->event_count_ = ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->s_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->s_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->c_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->c_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->r_event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); - arg = ACE_OS::strtok_r (0, ",", &aux); - this->r_event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg); - } - break; - case 'p': - this->pid_file_name_ = get_opt.optarg; + this->pid_filename_ = get_opt.optarg; break; - case 'd': - this->schedule_file_ = get_opt.optarg; - break; + case 'c': + this->config_filename_ = get_opt.optarg; + break; + + case 't': + this->event_period_ = ACE_OS::atoi (get_opt.optarg); + break; - case 's': - this->send_mcast_group_.set (get_opt.optarg); - break; + case 'n': + this->event_count_ = ACE_OS::atoi (get_opt.optarg); + break; - case 'r': - this->recv_mcast_group_.set (get_opt.optarg); - break; + case 'f': + { + char* aux; + int i = 0; + for (char* arg = ACE_OS::strtok_r (get_opt.optarg, ",", &aux); + arg != 0 && i < ECM_Driver::MAX_LOCAL_FEDERATIONS; + arg = ACE_OS::strtok_r (0, ",", &aux), ++i) + { + this->local_names_[i] = arg; + } + this->local_federations_count_ = i; + } + break; case '?': default: ACE_DEBUG ((LM_DEBUG, "Usage: %s " "[ORB options] " - "-x (short circuit EC) " - "-h <high priority args> " + "-n <event_count> " + "-t <event_period> " + "-l <localname> " "-p <pid file name> " - "-d <schedule file name> " - "-s <send_mcast group> " - "-r <recv_mcast group> " + "-c <config file name> " + "-g federation,federation,... " "\n", argv[0])); return -1; @@ -857,110 +483,328 @@ ECM_Driver::parse_args (int argc, char *argv []) "%s: event count (%d) is out of range, " "reset to default (%d)\n", argv[0], this->event_count_, - 160)); - this->event_count_ = 160; + 100)); + this->event_count_ = 100; + } + + return 0; +} + +int +ECM_Driver::parse_config_file (void) +{ + FILE* cfg = 0; + if (this->config_filename_ != 0) + cfg = ACE_OS::fopen (this->config_filename_, "r"); + else + cfg = stdin; + + if (cfg == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "cannot open config file <%s>\n", + this->config_filename_), -1); } - if (this->n_consumers_ <= 0 - || this->n_consumers_ >= ECM_Driver::MAX_CONSUMERS - || this->n_suppliers_ <= 0 - || this->n_suppliers_ >= ECM_Driver::MAX_SUPPLIERS) + int s = fscanf (cfg, "%d", &this->all_federations_count_); + if (s == 0 || s == EOF) + { + ACE_ERROR_RETURN ((LM_ERROR, + "problem reading federation count\n"), -1); + } + // ACE_DEBUG ((LM_DEBUG, + // "total federations = %d\n", + // this->all_federations_count_)); + for (int i = 0; i < this->all_federations_count_; ++i) { - ACE_ERROR_RETURN ((LM_DEBUG, - "%s: number of consumers or " - "suppliers out of range\n", argv[0]), -1); + if (this->skip_blanks (cfg, "reading federation name")) + return -1; + ACE_Read_Buffer reader(cfg); + char* buf = reader.read (' ', ' ', '\0'); + char* name = CORBA::string_dup (buf); + reader.alloc()->free (buf); + + + int port; + if (this->skip_blanks (cfg, "reading federation port number")) + return -1; + fscanf (cfg, "%d", &port); + CORBA::UShort mcast_port = ACE_static_cast(CORBA::UShort, port); + + int ns, nc; + if (this->skip_blanks (cfg, "reading supplier count")) + return -1; + s = fscanf (cfg, "%d", &ns); + if (s == 0 || s == EOF) + { + ACE_ERROR_RETURN ((LM_ERROR, + "problem reading supplier count (%d)\n", + i), -1); + } + if (this->skip_blanks (cfg, "reading constumer count")) + return -1; + s = fscanf (cfg, "%d", &nc); + if (s == 0 || s == EOF) + { + ACE_ERROR_RETURN ((LM_ERROR, + "problem reading consumer count (%d)\n", + i), -1); + } + // ACE_DEBUG ((LM_DEBUG, "i = %d <%s> <%d> <%d> <%d>\n", + // i, name, mcast_port, ns, nc)); + + char** supplier_names; + char** consumer_names; + ACE_NEW_RETURN (supplier_names, char*[ns], -1); + ACE_NEW_RETURN (consumer_names, char*[nc], -1); + + if (this->parse_name_list (cfg, ns, supplier_names, + "reading supplier list")) + { + ACE_ERROR_RETURN ((LM_ERROR, + "error parsing supplier list for <%s>\n", + name), -1); + } + + if (this->parse_name_list (cfg, nc, consumer_names, + "reading consumer list")) + { + ACE_ERROR_RETURN ((LM_ERROR, + "error parsing consumer list for <%s>\n", + name), -1); + } + + ACE_NEW_RETURN (this->all_federations_[i], + ECM_Federation (name, mcast_port, + ns, supplier_names, + nc, consumer_names), -1); + } + ACE_OS::fclose (cfg); + + for (int j = 0; j < this->local_federations_count_; ++j) + { + int k = 0; + for (; k < this->all_federations_count_; ++k) + { + if (ACE_OS::strcmp (this->local_names_[j], + this->all_federations_[k]->name ()) == 0) + { + ACE_NEW_RETURN (this->local_federations_[j], + ECM_Local_Federation (this->all_federations_[k], + this), + -1); + break; + } + } + if (k == this->all_federations_count_) + ACE_ERROR ((LM_ERROR, + "Cannot find federations <%s>\n", + this->local_names_[j])); } return 0; } +int +ECM_Driver::parse_name_list (FILE* file, + int n, + char** names, + const char* error_msg) +{ + for (int i = 0; i < n; ++i) + { + if (this->skip_blanks (file, error_msg)) + { + ACE_ERROR_RETURN ((LM_ERROR, + "error on item %d while %s\n", + i, error_msg), -1); + } + ACE_Read_Buffer tmp(file); + char* buf = tmp.read ('\n', '\n', '\0'); + names[i] = CORBA::string_dup (buf); + tmp.alloc ()->free (buf); + } + return 0; +} + +int +ECM_Driver::skip_blanks (FILE* file, + const char* error_msg) +{ + int c; + // Consume all the blanks. + while (isspace (c = fgetc (file))); + if (c == EOF) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Unexpected EOF in config file while %s\n", + error_msg), + -1); + } + ungetc (c, file); + return 0; +} // **************************************************************** -ECM_Supplier::ECM_Supplier (ECM_Driver *test, - void *cookie) - : test_ (test), - cookie_ (cookie), - consumer_ (this) +ECM_Federation::ECM_Federation (char* name, + CORBA::UShort mcast_port, + int supplier_types, + char** supplier_names, + int consumer_types, + char** consumer_names) + : name_ (name), + mcast_port_ (mcast_port), + supplier_types_ (supplier_types), + supplier_names_ (supplier_names), + consumer_types_ (consumer_types), + consumer_names_ (consumer_names), + addr_server_ (mcast_port) { + ACE_NEW (this->supplier_ipaddr_, CORBA::ULong[this->supplier_types_]); + ACE_NEW (this->consumer_ipaddr_, CORBA::ULong[this->consumer_types_]); + + int i; + for (i = 0; i < this->supplier_types_; ++i) + { + ACE_INET_Addr addr (u_short(0), this->supplier_names_[i]); + this->supplier_ipaddr_[i] = addr.get_ip_address (); + } + for (i = 0; i < this->consumer_types_; ++i) + { + ACE_INET_Addr addr (u_short(0), this->consumer_names_[i]); + this->consumer_ipaddr_[i] = addr.get_ip_address (); + } } void -ECM_Supplier::open (const char* name, - int event_a, - int event_b, - int event_count, - const RtecScheduler::Period& rate, - RtecEventChannelAdmin::EventChannel_ptr ec, - CORBA::Environment &_env) +ECM_Federation::open (ACE_SOCK_Dgram *dgram, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - this->event_a_ = event_a; - this->event_b_ = event_b; - this->event_count_ = event_count; - - TAO_TRY + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, this->name ()); + ACE_OS::strcat (buf, "/sender"); + + RtecUDPAdmin::AddrServer_var addr_server = + this->addr_server_._this (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->sender_.init (ec, scheduler, + buf, + addr_server.in (), + dgram, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + RtecScheduler::handle_t rt_info = + scheduler->create (buf, _env); + TAO_CHECK_ENV_RETURN_VOID(_env); + + // The worst case execution time is far less than 2 + // milliseconds, but that is a safe estimate.... + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + for (int i = 0; i < this->consumer_types (); ++i) { - RtecScheduler::Scheduler_ptr server = - ACE_Scheduler_Factory::server (); + qos.insert_type (this->consumer_ipaddr (i), rt_info); + } + RtecEventChannelAdmin::ConsumerQOS qos_copy = qos.get_ConsumerQOS (); + this->sender_.open (qos_copy, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); +} - RtecScheduler::handle_t rt_info = - server->create (name, TAO_TRY_ENV); - TAO_CHECK_ENV; +void +ECM_Federation::close (CORBA::Environment &_env) +{ + this->sender_.close (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + this->sender_.shutdown (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); +} - // The execution times are set to reasonable values, but - // actually they are changed on the real execution, i.e. we - // lie to the scheduler to obtain right priorities; but we - // don't care if the set is schedulable. - ACE_Time_Value tv (0, 2000); - TimeBase::TimeT time; - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - server->set (rt_info, - RtecScheduler::VERY_HIGH_CRITICALITY, - time, time, time, - rate, - RtecScheduler::VERY_LOW_IMPORTANCE, - time, - 1, - RtecScheduler::OPERATION, - TAO_TRY_ENV); - TAO_CHECK_ENV; +// **************************************************************** - this->supplier_id_ = ACE::crc32 (name); - ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name, - this->supplier_id_)); +ECM_Supplier::ECM_Supplier (ECM_Local_Federation* federation) + : federation_ (federation), + consumer_ (this) +{ +} - ACE_SupplierQOS_Factory qos; - qos.insert (this->supplier_id_, - this->event_a_, - rt_info, 1); - qos.insert (this->supplier_id_, - this->event_b_, - rt_info, 1); +void +ECM_Supplier::open (const char* name, + RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) +{ + RtecScheduler::handle_t rt_info = + scheduler->create (name, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + period, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->supplier_id_ = ACE::crc32 (name); + ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name, + this->supplier_id_)); + + ACE_SupplierQOS_Factory qos; + for (int i = 0; i < this->federation_->supplier_types (); ++i) + { qos.insert (this->supplier_id_, - ACE_ES_EVENT_SHUTDOWN, - rt_info, 1); - - RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = - ec->for_suppliers (TAO_TRY_ENV); - TAO_CHECK_ENV; + this->federation_->supplier_ipaddr (i), + rt_info, 1); + } + qos.insert (this->supplier_id_, + ACE_ES_EVENT_SHUTDOWN, + rt_info, 1); - this->consumer_proxy_ = - supplier_admin->obtain_push_consumer (TAO_TRY_ENV); - TAO_CHECK_ENV; + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - RtecEventComm::PushSupplier_var objref = this->_this (TAO_TRY_ENV); - TAO_CHECK_ENV; + this->consumer_proxy_ = + supplier_admin->obtain_push_consumer (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - this->consumer_proxy_->connect_push_supplier (objref.in (), - qos.get_SupplierQOS (), - TAO_TRY_ENV); - TAO_CHECK_ENV; + RtecEventComm::PushSupplier_var objref = this->_this (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - } - TAO_CATCHANY - { - TAO_RETHROW; - } - TAO_ENDTRY; + this->consumer_proxy_->connect_push_supplier (objref.in (), + qos.get_SupplierQOS (), + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); } void @@ -970,262 +814,423 @@ ECM_Supplier::close (CORBA::Environment &_env) return; this->consumer_proxy_->disconnect_push_consumer (_env); - if (_env.exception () != 0) return; + TAO_CHECK_ENV_RETURN_VOID (_env); this->consumer_proxy_ = 0; } void ECM_Supplier::activate (const char* name, - const RtecScheduler::Period& rate, - RtecEventChannelAdmin::EventChannel_ptr ec, - CORBA::Environment &_env) + RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - TAO_TRY - { - RtecScheduler::Scheduler_ptr server = - ACE_Scheduler_Factory::server (); + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, "consumer_"); + ACE_OS::strcat (buf, name); + RtecScheduler::handle_t rt_info = + scheduler->create (buf, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + period, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + // Also connect our consumer for timeout events from the EC. + int interval = period / 10; + ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS, + interval % ACE_ONE_SECOND_IN_USECS); + TimeBase::TimeT timeout; + ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout); + + ACE_ConsumerQOS_Factory consumer_qos; + consumer_qos.start_disjunction_group (); + consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + timeout, + rt_info); + + // = Connect as a consumer. + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + RtecEventComm::PushConsumer_var cref = + this->consumer_._this (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->supplier_proxy_->connect_push_consumer (cref.in (), + consumer_qos.get_ConsumerQOS (), + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); +} - const int bufsize = 512; - char buf[bufsize]; - ACE_OS::strcpy (buf, "consumer_"); - ACE_OS::strcat (buf, name); - RtecScheduler::handle_t rt_info = - server->create (buf, TAO_TRY_ENV); - TAO_CHECK_ENV; +int +ECM_Supplier::supplier_id (void) const +{ + return this->supplier_id_; +} +void +ECM_Supplier::push (const RtecEventComm::EventSet& events, + CORBA::Environment& _env) +{ + for (u_int i = 0; i < events.length (); ++i) + { + const RtecEventComm::Event& e = events[i]; + if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT) + continue; - // The execution times are set to reasonable values, but - // actually they are changed on the real execution, i.e. we - // lie to the scheduler to obtain right priorities; but we - // don't care if the set is schedulable. - ACE_Time_Value tv (0, 2000); - TimeBase::TimeT time; - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - server->set (rt_info, - RtecScheduler::VERY_HIGH_CRITICALITY, - time, time, time, - rate, - RtecScheduler::VERY_LOW_IMPORTANCE, - time, - 1, - RtecScheduler::OPERATION, - TAO_TRY_ENV); - TAO_CHECK_ENV; + this->federation_->supplier_timeout (this->consumer_proxy_.in (), + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + } +} - // Also connect our consumer for timeout events from the EC. - int interval = rate / 10; - ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS, - interval % ACE_ONE_SECOND_IN_USECS); - TimeBase::TimeT timeout; - ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout); - - ACE_ConsumerQOS_Factory consumer_qos; - consumer_qos.start_disjunction_group (); - consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, - timeout, - rt_info); - - // = Connect as a consumer. - RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - ec->for_consumers (TAO_TRY_ENV); - TAO_CHECK_ENV; +void +ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env) +{ + this->supplier_proxy_->disconnect_push_supplier (_env); +} - this->supplier_proxy_ = - consumer_admin->obtain_push_supplier (TAO_TRY_ENV); - TAO_CHECK_ENV; +void +ECM_Supplier::disconnect_push_consumer (CORBA::Environment &) +{ +} - RtecEventComm::PushConsumer_var cref = - this->consumer_._this (TAO_TRY_ENV); - TAO_CHECK_ENV; +// **************************************************************** - this->supplier_proxy_->connect_push_consumer (cref.in (), - consumer_qos.get_ConsumerQOS (), - TAO_TRY_ENV); - TAO_CHECK_ENV; - } - TAO_CATCHANY - { - TAO_RETHROW; - } - TAO_ENDTRY; +ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation) + : federation_ (federation) +{ } void -ECM_Supplier::push (const RtecEventComm::EventSet& events, - CORBA::Environment& _env) +ECM_Consumer::open (const char* name, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment& _env) { -#if 0 - const int bufsize = 128; - char buf[bufsize]; - ACE_OS::sprintf (buf, "Supplier %d receives event in thread: ", - this->supplier_id_); -#endif - - if (events.length () == 0 || this->event_count_ < 0) + RtecScheduler::handle_t rt_info = + scheduler->create (name, _env); + TAO_CHECK_ENV_RETURN_VOID(_env); + + // The worst case execution time is far less than 2 + // milliseconds, but that is a safe estimate.... + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + ACE_ConsumerQOS_Factory qos; + qos.start_disjunction_group (); + qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info); + const ECM_Federation* federation = this->federation_->federation (); + for (int i = 0; i < federation->consumer_types (); ++i) { - // ACE_DEBUG ((LM_DEBUG, "no events\n")); - return; + qos.insert_type (federation->consumer_ipaddr (i), rt_info); } - RtecEventComm::EventSet sent (events.length ()); - sent.length (events.length ()); + // = Connect as a consumer. + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - for (u_int i = 0; i < events.length (); ++i) - { - const RtecEventComm::Event& e = events[i]; - if (e.type_ != ACE_ES_EVENT_INTERVAL_TIMEOUT) - continue; - - // ACE_DEBUG ((LM_DEBUG, "ECM_Supplier - timeout (%t)\n")); + this->supplier_proxy_ = + consumer_admin->obtain_push_supplier (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - RtecEventComm::Event& s = sent[i]; - s.source_ = this->supplier_id_; - s.ttl_ = 1; + RtecEventComm::PushConsumer_var objref = this->_this (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - ACE_hrtime_t t = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t); - s.ec_recv_time_ = ORBSVCS_Time::zero; - s.ec_send_time_ = ORBSVCS_Time::zero; + this->supplier_proxy_->connect_push_consumer (objref.in (), + qos.get_ConsumerQOS (), + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); +} - s.data_.x = 0; - s.data_.y = 0; +void +ECM_Consumer::close (CORBA::Environment &_env) +{ + if (CORBA::is_nil (this->supplier_proxy_.in ())) + return; - this->event_count_--; + this->supplier_proxy_->disconnect_push_supplier (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); - if (this->event_count_ < 0) - { - //this->supplier_proxy_->disconnect_push_supplier (_env); - //if (_env.exception () != 0) return; - this->test_->shutdown_supplier (this->cookie_, - this->consumer_proxy_.in (), - _env); - } - if (this->event_count_ % 2 == 0) - { - // Generate an A event... - s.type_ = this->event_a_; - } - else - { - s.type_ = this->event_b_; - } - } - this->test_->push_supplier (this->cookie_, - this->consumer_proxy_.in (), - sent, - _env); + this->supplier_proxy_ = 0; } void -ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env) +ECM_Consumer::push (const RtecEventComm::EventSet& events, + CORBA::Environment &_env) { - this->supplier_proxy_->disconnect_push_supplier (_env); + ACE_hrtime_t arrival = ACE_OS::gethrtime (); + this->federation_->consumer_push (arrival, events, _env); } void -ECM_Supplier::disconnect_push_consumer (CORBA::Environment &) +ECM_Consumer::disconnect_push_consumer (CORBA::Environment &) { } -int ECM_Supplier::supplier_id (void) const +// **************************************************************** + +ECM_Local_Federation::ECM_Local_Federation (ECM_Federation *federation, + ECM_Driver *driver) + : federation_ (federation), + driver_ (driver), + consumer_ (this), + supplier_ (this), + recv_count_ (0), + unfiltered_count_ (0), + invalid_count_ (0), + send_count_ (0), + event_count_ (0), + last_publication_change_ (0), + last_subscription_change_ (0), + mcast_eh_ (&receiver_) { - return this->supplier_id_; } -// **************************************************************** +void +ECM_Local_Federation::open (int event_count, + RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment& _env) +{ + this->event_count_ = event_count; + + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, this->federation_->name ()); + ACE_OS::strcat (buf, "::supplier"); + + this->supplier_.open (buf, period, ec, scheduler, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + ACE_OS::strcpy (buf, this->federation_->name ()); + ACE_OS::strcat (buf, "::consumer"); + this->consumer_.open (buf, ec, scheduler, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); +} -ECM_Consumer::ECM_Consumer (ECM_Driver *test, - void *cookie) - : test_ (test), - cookie_ (cookie) +void +ECM_Local_Federation::close (CORBA::Environment &_env) { + this->consumer_.close (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->supplier_.close (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); } void -ECM_Consumer::open (const char* name, - int event_a, int event_b, - RtecEventChannelAdmin::EventChannel_ptr ec, - CORBA::Environment& _env) +ECM_Local_Federation::activate (RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment& _env) { - TAO_TRY - { - RtecScheduler::Scheduler_ptr server = - ACE_Scheduler_Factory::server (); + this->supplier_.activate (this->federation_->name (), + period, + ec, scheduler, _env); +} - RtecScheduler::handle_t rt_info = - server->create (name, TAO_TRY_ENV); - TAO_CHECK_ENV; +void +ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer, + CORBA::Environment &_env) +{ + RtecEventComm::EventSet sent (1); + sent.length (1); - // The worst case execution time is far less than 2 - // milliseconds, but that is a safe estimate.... - ACE_Time_Value tv (0, 2000); - TimeBase::TimeT time; - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - server->set (rt_info, - RtecScheduler::VERY_HIGH_CRITICALITY, - time, time, time, - 0, - RtecScheduler::VERY_LOW_IMPORTANCE, - time, - 0, - RtecScheduler::OPERATION, - TAO_TRY_ENV); - TAO_CHECK_ENV; + RtecEventComm::Event& s = sent[0]; + s.header.source = this->supplier_.supplier_id(); + s.header.ttl = 1; - ACE_ConsumerQOS_Factory qos; - qos.start_disjunction_group (); - qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info); - qos.insert_type (event_a, rt_info); - qos.insert_type (event_b, rt_info); + ACE_hrtime_t t = ACE_OS::gethrtime (); + ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t); + s.header.ec_recv_time = ORBSVCS_Time::zero; + s.header.ec_send_time = ORBSVCS_Time::zero; - // = Connect as a consumer. - RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - ec->for_consumers (TAO_TRY_ENV); - TAO_CHECK_ENV; + s.data.x = 0; + s.data.y = 0; - this->supplier_proxy_ = - consumer_admin->obtain_push_supplier (TAO_TRY_ENV); - TAO_CHECK_ENV; + this->event_count_--; - RtecEventComm::PushConsumer_var objref = this->_this (TAO_TRY_ENV); - TAO_CHECK_ENV; + // ACE_DEBUG ((LM_DEBUG, "Federation <%s> event count <%d>\n", + // this->name (), this->event_count_)); - this->supplier_proxy_->connect_push_consumer (objref.in (), - qos.get_ConsumerQOS (), - TAO_TRY_ENV); - TAO_CHECK_ENV; + if (this->event_count_ < 0) + { + this->driver_->federation_has_shutdown (this, _env); + return; } - TAO_CATCHANY + int i = this->event_count_ % this->federation_->supplier_types (); + s.header.type = this->federation_->supplier_ipaddr (i); + + consumer->push (sent, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->send_count_++; +} + +void +ECM_Local_Federation::consumer_push (ACE_hrtime_t, + const RtecEventComm::EventSet &event, + CORBA::Environment &_env) +{ + if (event.length () == 0) { - TAO_RETHROW; + return; + } + + for (CORBA::ULong i = 0; i < event.length (); ++i) + { + const RtecEventComm::Event& e = event[i]; + + this->recv_count_++; + + int j = 0; + for (; j < this->federation_->consumer_types (); ++j) + { + if (e.header.type == this->federation_->consumer_ipaddr(j)) + { + // @@ TODO check if the type is in the current + // subscription list. + break; + } + } + if (j == this->federation_->consumer_types ()) + this->invalid_count_++; } - TAO_ENDTRY; } void -ECM_Consumer::close (CORBA::Environment &_env) +ECM_Local_Federation::open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env) { - if (CORBA::is_nil (this->supplier_proxy_.in ())) - return; + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, this->name ()); + ACE_OS::strcat (buf, "/receiver"); + + ACE_INET_Addr local_addr; + this->federation_->sender_local_addr (local_addr); + this->receiver_.init (ec, scheduler, + buf, + local_addr, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + RtecScheduler::handle_t rt_info = + scheduler->create (buf, _env); + TAO_CHECK_ENV_RETURN_VOID(_env); + + // The worst case execution time is far less than 2 + // milliseconds, but that is a safe estimate.... + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + RtecEventComm::EventSourceID source = ACE::crc32 (buf); + + this->mcast_eh_.reactor (TAO_ORB_Core_instance ()->reactor ()); + + ACE_SupplierQOS_Factory qos; + for (int i = 0; i < this->consumer_types (); ++i) + { + qos.insert (source, + this->consumer_ipaddr (i), + rt_info, 1); + ACE_INET_Addr mcast_addr (this->mcast_port (), + this->consumer_ipaddr (i)); + this->mcast_eh_.subscribe (mcast_addr); + } - this->supplier_proxy_->disconnect_push_supplier (_env); - if (_env.exception () != 0) return; + this->mcast_eh_.open (); - this->supplier_proxy_ = 0; + RtecEventChannelAdmin::SupplierQOS qos_copy = + qos.get_SupplierQOS (); + this->receiver_.open (qos_copy, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + } void -ECM_Consumer::push (const RtecEventComm::EventSet& events, - CORBA::Environment &_env) +ECM_Local_Federation::close_receiver (CORBA::Environment &_env) { - ACE_hrtime_t arrival = ACE_OS::gethrtime (); - this->test_->push_consumer (this->cookie_, arrival, events, _env); + this->receiver_.close (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + this->receiver_.shutdown (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); } void -ECM_Consumer::disconnect_push_consumer (CORBA::Environment &) +ECM_Local_Federation::dump_results (void) const { + double unfiltered_ratio = 0; + if (this->recv_count_ != 0) + unfiltered_ratio = double(this->unfiltered_count_)/this->recv_count_; + double invalid_ratio = 0; + if (this->recv_count_ != 0) + invalid_ratio = double(this->invalid_count_)/this->recv_count_; + + ACE_DEBUG ((LM_DEBUG, + "Federation: %s\n" + " events received: %d\n" + " unfiltered events received: %d\n" + " ratio: %f\n" + " invalid events received: %d\n" + " ratio: %f\n" + " events sent: %d\n", + this->name (), + this->recv_count_, + this->unfiltered_count_, + unfiltered_ratio, + this->invalid_count_, + invalid_ratio, + this->send_count_)); } // **************************************************************** @@ -1233,8 +1238,8 @@ ECM_Consumer::disconnect_push_consumer (CORBA::Environment &) int main (int argc, char *argv []) { - ECM_Driver test; - return test.run (argc, argv); + ECM_Driver driver; + return driver.run (argc, argv); } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h index 46aa7a3aaad..758f5c3734b 100644 --- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h @@ -6,10 +6,37 @@ // = DESCRIPTION // This test attempts to communicate several Event Channels UDP // using multicast. -// The test hardcodes all the objects involved (consumers, -// suppliers, proxies, etc.); the objective is to gain understanding -// on the use of multicast events, not to provide a complete and -// clean implementation. +// The test reads a configuration file that describe what events are +// received by each "VirtualConsumer". The user must provide, on the +// command line, which virtual consumers are present on each process. +// The test also creates one supplier for each consumer, the +// supplier can send an event of any possible type described in the +// file. + +// = HOW +// The test creates a local consumer for each remote consumer, this +// is necessary to send the event with the right port number; it +// then sends the event using multicast. +// Notice that there is still a win in using multicast because +// multiple copies of the virtual consumer may be available. +// To receive the event the test creates one local supplier for each +// local "Virtual Consumer". +// +// = TODO +// The class names in this test are *way* too artificial, I should +// use the RTI names. +// +// It is unfortunate that the test must know before-hand the remote +// consumer interests. It would be really simple to use a better +// strategy: the test could "observe" changes in the remote EC +// subscription list, it could then modify its local consumers +// subscriptions. +// Similarly the suppliers that supply Mcast packets as local events +// could observe the changes in the local subscriptions and use that +// to join or leave the multicast groups. +// To demostrate this the test will need to reconfigure its +// subscription list every so often (a few seconds seems like a good +// idea). // // ============================================================================ @@ -22,29 +49,114 @@ #include "orbsvcs/RtecEventCommS.h" #include "orbsvcs/Channel_Clients_T.h" #include "orbsvcs/Event/EC_Gateway_UDP.h" +#include "orbsvcs/Event/EC_UDP_Admin.h" class ECM_Driver; +class ECM_Federation +{ + // = DESCRIPTION + // The test reads a configuration file where it obtains the data + // about each "federation". A federation is some application, + // distributed over several processes. The potential set of + // publications and the potential set of subscriptions is known + // beforehand, but the actual publications (or subscriptions) may + // change dynamically. + // As stated above the federation may be present in more than one + // process, but also a process may participate in more than one + // federation. + // +public: + ECM_Federation (char* name, + CORBA::UShort mcast_port, + int supplier_types, + char** supplier_names, + int consumer_types, + char** consumer_names); + // Constructor, it assumes ownership of the buffers, strings must be + // allocated using CORBA::string_alloc(), buffers using operator new. + + ~ECM_Federation (void); + // Dtor + + const char* name (void) const; + // The name of the federation.... + + CORBA::UShort mcast_port (void) const; + // The port used by this federation to receive mcast messages. + + int supplier_types (void) const; + // The number of different event types published by this federation. + + const char* supplier_name (CORBA::ULong i) const; + // The name (mcast addr in A.B.C.D format) of the event type <i> + + CORBA::ULong supplier_ipaddr (CORBA::ULong i) const; + // The ipaddr (in host byte order) of the event type <i> + + int consumer_types (void) const; + // The number of different event types consumed by this federation. + + const char* consumer_name (CORBA::ULong i) const; + // The name (mcast addr in A.B.C.D format) of the event type <i> + + CORBA::ULong consumer_ipaddr (CORBA::ULong i) const; + // The ipaddr (in host byte order) of the event type <i> + + void open (ACE_SOCK_Dgram *dgram, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env); + // Connect the UDP sender to the EC. + + void close (CORBA::Environment &_env); + // Close the UDP sender, disconnect from the EC + + int sender_local_addr (ACE_INET_Addr& addr); + // Return the sender local address + +private: + char* name_; + CORBA::UShort mcast_port_; + + int supplier_types_; + char** supplier_names_; + CORBA::ULong* supplier_ipaddr_; + + int consumer_types_; + char** consumer_names_; + CORBA::ULong* consumer_ipaddr_; + + TAO_ECG_UDP_Sender sender_; + // The sender + + TAO_EC_Simple_AddrServer addr_server_; + // Resolve event headers (type,source) to UDP addresses + // (ipaddr,port) +}; + +class ECM_Local_Federation; + class ECM_Supplier : public POA_RtecEventComm::PushSupplier { // // = TITLE - // Helper class to implement the different tests within ECM_Driver. + // Helper class to simulate an application acting as an event + // supplier. // // = DESCRIPTION - // ECM_Driver can be configured to have a single or mcast - // suppliers, to use the EC or short-circuit it, to use the - // Gateway or not; this class connects as a consumer for timeouts - // in the EC, at each timeout it delegates on the ECM_Driver class - // to execute the proper test. + // This class connects as a consumer for timeouts in the EC. On + // every timeout it delegates on the ECM_Local_Federation class, + // usually this results in some reconfiguration and/or some events + // sent. + // public: - ECM_Supplier (ECM_Driver* test, void* cookie); + ECM_Supplier (ECM_Local_Federation* federation); void open (const char* name, - int event_a, int event_b, - int event_count, - const RtecScheduler::Period& rate, - RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment& _env); // This method connects the supplier to the EC. @@ -52,38 +164,30 @@ public: // Disconnect from the EC. void activate (const char* name, - const RtecScheduler::Period& rate, - RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment& _env); + // Connect as a consumer to start receiving events. + + RtecEventComm::EventSourceID supplier_id (void) const; + // The supplier ID. void push (const RtecEventComm::EventSet& events, CORBA::Environment &_env); void disconnect_push_consumer (CORBA::Environment &); // Implement the callbacks for our consumer personality. - + // = The POA_RtecEventComm::PushSupplier methods. virtual void disconnect_push_supplier (CORBA::Environment &); - // The methods in the skeleton. - - RtecEventComm::EventSourceID supplier_id (void) const; - // The supplier ID. private: - ECM_Driver* test_; - - void *cookie_; - // The test provide us a cookie so we can give back our identity. + ECM_Local_Federation* federation_; + // To callback the federation. RtecEventComm::EventSourceID supplier_id_; // We generate an id based on the name.... - int event_a_; - int event_b_; - // The two types of events we may generate... - - int event_count_; - // The number of events sent by this supplier. - RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy_; // We talk to the EC (as a supplier) using this proxy. @@ -93,252 +197,298 @@ private: RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; // We talk to the EC (as a supplier) using this proxy. - }; class ECM_Consumer : public POA_RtecEventComm::PushConsumer { // // = TITLE - // Helper class to implement the different tests within ECM_Driver. + // Helper class to simulate an application acting as an event + // consumer. // // = DESCRIPTION - // ECM_Driver must collect events destined to many consumers, but - // needs to distinguish through which consumer it is receiving the - // event. The easiest way is to create a shallow class that - // forwards the events to the EC, but passing back some cookie to - // identify the consumer. + // This class connects as an event consumer to the EC. The events + // are actually handled by the ECM_Local_Federation. public: - ECM_Consumer (ECM_Driver* test, void *cookie); + ECM_Consumer (ECM_Local_Federation* federation); void open (const char* name, - int event_a, int event_b, - RtecEventChannelAdmin::EventChannel_ptr ec, + RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment& _env); // This method connects the consumer to the EC. void close (CORBA::Environment &_env); // Disconnect from the EC. + // = The POA_RtecEventComm::PushComsumer methods. virtual void push (const RtecEventComm::EventSet& events, CORBA::Environment &_env); virtual void disconnect_push_consumer (CORBA::Environment &); - // The skeleton methods. private: - ECM_Driver* test_; - // The test class. - - void *cookie_; - // The magic cookie that serves as our ID. + ECM_Local_Federation* federation_; + // To callback. RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; // We talk to the EC using this proxy. }; +class ECM_Local_Federation +{ + // = DESCRIPTION + // This class is used to represent a federation that is actually + // running in this process. + // +public: + ECM_Local_Federation (ECM_Federation *federation, + ECM_Driver *driver); + // Constructor. + + void open (int event_count, + RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment& _env); + // Connect both the supplier and the consumer. + + void close (CORBA::Environment& _env); + // Disconnect everybody from the EC + + void activate (RtecScheduler::Period period, + RtecEventChannelAdmin::EventChannel_ptr event_channel, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment& _env); + // Activate the supplier + + void supplier_timeout (RtecEventComm::PushConsumer_ptr consumer, + CORBA::Environment& _env); + // The supplier is ready to send a new event. + + void consumer_push (ACE_hrtime_t arrival, + const RtecEventComm::EventSet& event, + CORBA::Environment& _env); + // The consumer just received an event. + + const ECM_Federation *federation (void) const; + // The federation description. + + void open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env); + // Connect the UDP receiver to the EC. + + void close_receiver (CORBA::Environment &_env); + // Close the UDP receiver, disconnect from the EC + + void dump_results (void) const; + // Report the results back to the user... + + // = Delegate on the federation description + const char* name (void) const; + CORBA::UShort mcast_port (void) const; + int supplier_types (void) const; + const char* supplier_name (CORBA::ULong i) const; + CORBA::ULong supplier_ipaddr (CORBA::ULong i) const; + int consumer_types (void) const; + const char* consumer_name (CORBA::ULong i) const; + CORBA::ULong consumer_ipaddr (CORBA::ULong i) const; + +private: + ECM_Federation *federation_; + // The description of the events we send and receive. + + ECM_Driver *driver_; + // The test driver. + + ECM_Consumer consumer_; + ECM_Supplier supplier_; + // The supplier and consumer helper classes, other than + // initialization this classes only forward events to the + // Federation. + + // Collect statistics + + CORBA::ULong recv_count_; + // Messages received. + + CORBA::ULong unfiltered_count_; + // Messages received that were not properly filtered. + + CORBA::ULong invalid_count_; + // Message received that could *not* be destined to this federation, + // yet they were received. + + CORBA::ULong send_count_; + // Messages sent. + + int event_count_; + // How many messages will we send before stop the simulation. + + ACE_Time_Value last_publication_change_; + // The last time we changed our publication list, we don't change it + // too often. + + ACE_Time_Value last_subscription_change_; + // The last time we changed our publication, so we don't change too + // often. + + TAO_ECG_UDP_Receiver receiver_; + // This object reads the events and pushes them into the EC. Notice + // that it can receive events from multiple Event Handlers. + + TAO_ECG_Mcast_EH mcast_eh_; + // The event handler, it receives callbacks from the reactor + // whenever an event is available in some of the multicast groups, + // it then forwards to the <mcast_recv_> object for processing and + // dispatching of the event. + // @@ TODO Eventually we may need several of this objects to handle + // OS limitations on the number of multicast groups per socket. +}; + class ECM_Driver { // // = TITLE - // Test and demonstrate the use of TAO_EC_Gateway. + // Demonstrate the use of the UDP Gateways. // // = DESCRIPTION - // This class is design to exercise several features of the EC_Gateway - // class and the mcast EC architecture. - // We want to create two EC, each one having a single supplier and a - // single consumer. - // + To test the remote facilities the consumer register for both a - // local event and a remote one. - // + To test the remote filtering features the remote consumer only - // wants one of the local events, and this event is generated less - // frequently. - // - // This class creates the local EC_Gateway a consumer and a - // supplier, it uses the command line to figure the subscriptions - // and publications list. + // This class is design to exercise several features of the UDP + // Gateways and its companion classes. + // We create a set of processes, each running one EC, with + // multiple consumers and suppliers colocated with the EC. + // The ECs communicate among themselves using multicast. + // The test thus show how to use multicast, change the local + // ECG_UDP_Receiver and ECG_UDP_Sender QoS specifications + // dynamically, how to economically use the OS resources to + // receive and send multicast messages, etc. // public: ECM_Driver (void); enum { MAX_EVENTS = 1024, - // Maximum number of events to send... + // Maximum number of events to send on each Federation. - MAX_CONSUMERS = 16, - // Maximum number of consumers. + MAX_LOCAL_FEDERATIONS = 16, + // Maximum number of federations running on a single process - MAX_SUPPLIERS = 16 - // Maximum number of suppliers. + MAX_FEDERATIONS = 128 + // Maximum number of federations in the simulation }; int run (int argc, char* argv[]); - // Execute the test. - - void push_supplier (void* supplier_cookie, - RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer, - const RtecEventComm::EventSet &events, - CORBA::Environment &); - // Callback method for suppliers, we push for them to their - // consumers and take statistics on the way. - // It is possible that we ignore the <consumer> parameter when - // testing the short-circuit case. - - void push_consumer (void* consumer_cookie, - ACE_hrtime_t arrival, - const RtecEventComm::EventSet& events, - CORBA::Environment&); - // Callback method for consumers, if any of our consumers has - // received events it will invoke this method. - - void shutdown_supplier (void* supplier_cookie, - RtecEventComm::PushConsumer_ptr consumer, - CORBA::Environment& _env); - // One of the suppliers has completed its work. + // Run the test, read all the configuration files, etc. + + void federation_has_shutdown (ECM_Local_Federation *federation, + CORBA::Environment& _env); + // One of the federations has completed its simulation, once all of + // them finish the test exists. + private: - RtecEventChannelAdmin::EventChannel_ptr - get_ec (CosNaming::NamingContext_ptr naming_context, - const char* ec_name, - CORBA::Environment &_env); - // Helper routine to obtain an EC given its name. - - void connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env); - void disconnect_suppliers (CORBA::Environment &_env); - // Connect the suppliers. - - void activate_suppliers (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env); - // Activate the suppliers, i.e. they start generating events. - - void connect_ecg (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env); - // Connect the EC gateway, it builds the Subscriptions and the - // Publications list. - - void connect_consumers (RtecEventChannelAdmin::EventChannel_ptr local_ec, - CORBA::Environment &_env); - void disconnect_consumers (CORBA::Environment &_env); - // Connect and disconnect the consumers. + void open_federations (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env); + // Connect the federations to the EC. + + void activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env); + // Activate all the federations + + void close_federations (CORBA::Environment &_env); + // Close the federations, i.e. disconnect from the EC, deactivate + // the objects, etc. + + void open_senders (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env); + // Connect all the senders, so we can start multicasting events. + + void open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, + CORBA::Environment &_env); + // Connect all the receivers, thus we accept events arriving through + // multicast. + + void close_senders (CORBA::Environment &_env); + // Close all the senders to cleanup resources. + + void close_receivers (CORBA::Environment &_env); + // Close all the receivers to cleanup resources. int shutdown (CORBA::Environment&); - // Called when the main thread (i.e. not the scavenger thread) is - // shutting down. + // Called when the main thread. int parse_args (int argc, char* argv[]); - // parse the command line args - - void dump_results (void); - // Dump the results to the standard output. + // parse the command line arguments - void wait_until_ready (void); - // Block event delivery until all the consumers are ready. + int parse_config_file (void); + // parse the command line arguments - struct Stats; - void dump_results (const char* name, Stats& stats); - // Dump the results for a particular consumer. + int parse_name_list (FILE* file, int n, char** names, + const char* error_msg); + // parse one of the lists of names in the federation definition. - int local_source (RtecEventComm::EventSourceID id) const; - // Check if <id> correspond to a local supplier. + int skip_blanks (FILE* file, + const char* error_msg); + // skip the blanks in the file. - void shutdown_consumer (int id); - // One of the consumers has completed its work. + void dump_results (void); + // Dump the results to the standard output. private: char* lcl_name_; // The name of the "local" EC. - int short_circuit_; - // Send events directly to local consumers. - - int n_suppliers_; - // The number of suppliers in this test. - - int n_consumers_; - // The number of consumers. - - int workload_; - // The number of iterations of ACE::is_prime() to execute. - int event_period_; // The events are generated using this interval. int event_count_; // How many events will the suppliers send - int s_event_a_; - int s_event_b_; - int c_event_a_; - int c_event_b_; - int r_event_a_; - int r_event_b_; - // Each supplier send two types of events, each consumer receives - // two other types. The remote suppliers send other two types of - // events. - - const char* schedule_file_; - // Ask the schedule to compute and dump its schedule after the test - // execution. - - const char* pid_file_name_; - // The name of a file where the process stores its pid + char* config_filename_; + // The name of the file where we read the configuration. - ACE_INET_Addr send_mcast_group_; - // The multicast group to send the consumer events. - - TAO_ECG_UDP_Sender sender_; - // This consumer sends the local events through <send_mcast_group> - - ACE_INET_Addr recv_mcast_group_; - // The multicast group to receive the consumer events. - - TAO_ECG_UDP_Receiver receiver_; - // This supplier pushes the remote events received throug - // <recv_mcast_group> - - TAO_ECG_Mcast_EH mcast_eh_; - // The event handler to receive the mcast events. - - ECM_Supplier* suppliers_[ECM_Driver::MAX_SUPPLIERS]; - ECM_Consumer* consumers_[ECM_Driver::MAX_CONSUMERS]; - // The suppliers and consumer arrays, the sizes are controlled using - // {lp,hp}_{suppliers,consumers}_ + const char* pid_filename_; + // The name of a file where the process stores its pid - // @@ TODO it looks like the HP and LP data could be encapsulated. + int local_federations_count_; + // How many federations are running in this process (or, if you + // prefer, in how many federations does this process participate). - struct Stats { - ACE_hrtime_t total_time_; - float laxity_[MAX_EVENTS]; - ACE_hrtime_t lcl_latency_[MAX_EVENTS]; - ACE_hrtime_t end_[MAX_EVENTS]; - int lcl_count_; - // We keep laxity and total_time stats only for the local events. + ECM_Local_Federation* local_federations_[MAX_LOCAL_FEDERATIONS]; + // The local federations. - ACE_hrtime_t rmt_latency_[MAX_EVENTS]; - int rmt_count_; - }; - Stats stats_[ECM_Driver::MAX_CONSUMERS]; - // Store the measurements for local and remote events.. + char* local_names_[MAX_LOCAL_FEDERATIONS]; + // The names of the local federations. - int ready_; - ACE_SYNCH_MUTEX ready_mtx_; - ACE_SYNCH_CONDITION ready_cnd_; - // Before accepting any events the suppliers must wait for the test - // to setup all the consumers. - // The suppliers wait on the condition variable. + int all_federations_count_; + // The total number of federations we belong to. - ACE_Atomic_Op<ACE_SYNCH_MUTEX,int> running_suppliers_; - // keep track of how many suppliers are still running so we shutdown - // at the right moment. + ECM_Federation* all_federations_[MAX_FEDERATIONS]; + // All the federations. - ACE_Atomic_Op<ACE_SYNCH_MUTEX,int> running_consumers_; - // keep track of how many consumers are still running so we shutdown - // at the right moment. + ACE_Atomic_Op<ACE_SYNCH_MUTEX,int> federations_running_; + // Keep track of how many federations are active so we can shutdown + // once they are all destroyed. ACE_hrtime_t test_start_; ACE_hrtime_t test_stop_; // Measure the test elapsed time as well as mark the beginning of // the frames. + + CORBA::ORB_var orb_; + // The ORB, so we can shut it down. + + ACE_SOCK_Dgram send_dgram_; + // This socket is shared by all the federations to send the + // multicast events. }; +#if defined (__ACE_INLINE__) +#include "EC_Mcast.i" +#endif /* __ACE_INLINE__ */ + #endif /* EC_MCAST_H */ diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i new file mode 100644 index 00000000000..0e1a45f7940 --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.i @@ -0,0 +1,120 @@ +// +// $Id$ +// + +ACE_INLINE int +ECM_Federation::sender_local_addr (ACE_INET_Addr& addr) +{ + return this->sender_.get_local_addr (addr); +} + + +ACE_INLINE const char* +ECM_Federation::name (void) const +{ + return this->name_; +} + +ACE_INLINE CORBA::UShort +ECM_Federation::mcast_port (void) const +{ + return this->mcast_port_; +} + +ACE_INLINE int +ECM_Federation::supplier_types (void) const +{ + return this->supplier_types_; +} + +ACE_INLINE const char* +ECM_Federation::supplier_name (CORBA::ULong i) const +{ + if (i < this->supplier_types_) + return this->supplier_names_[i]; + return 0; +} + +ACE_INLINE CORBA::ULong +ECM_Federation::supplier_ipaddr (CORBA::ULong i) const +{ + if (i < this->supplier_types_) + return this->supplier_ipaddr_[i]; + return 0; +} + +ACE_INLINE int +ECM_Federation::consumer_types (void) const +{ + return this->consumer_types_; +} + +ACE_INLINE const char* +ECM_Federation::consumer_name (CORBA::ULong i) const +{ + if (i < this->consumer_types_) + return this->consumer_names_[i]; + return 0; +} + +ACE_INLINE CORBA::ULong +ECM_Federation::consumer_ipaddr (CORBA::ULong i) const +{ + if (i < this->consumer_types_) + return this->consumer_ipaddr_[i]; + return 0; +} + +ACE_INLINE const ECM_Federation* +ECM_Local_Federation::federation (void) const +{ + return this->federation_; +} + +ACE_INLINE const char* +ECM_Local_Federation::name (void) const +{ + return this->federation_->name (); +} + +ACE_INLINE CORBA::UShort +ECM_Local_Federation::mcast_port (void) const +{ + return this->federation_->mcast_port (); +} + +ACE_INLINE int +ECM_Local_Federation::supplier_types (void) const +{ + return this->federation_->supplier_types (); +} + +ACE_INLINE const char* +ECM_Local_Federation::supplier_name (CORBA::ULong i) const +{ + return this->federation_->supplier_name (i); +} + +ACE_INLINE CORBA::ULong +ECM_Local_Federation::supplier_ipaddr (CORBA::ULong i) const +{ + return this->federation_->supplier_ipaddr (i); +} + +ACE_INLINE int +ECM_Local_Federation::consumer_types (void) const +{ + return this->federation_->consumer_types (); +} + +ACE_INLINE const char* +ECM_Local_Federation::consumer_name (CORBA::ULong i) const +{ + return this->federation_->consumer_name (i); +} + +ACE_INLINE CORBA::ULong +ECM_Local_Federation::consumer_ipaddr (CORBA::ULong i) const +{ + return this->federation_->consumer_ipaddr (i); +} diff --git a/TAO/orbsvcs/tests/EC_Mcast/README b/TAO/orbsvcs/tests/EC_Mcast/README new file mode 100644 index 00000000000..e3be683604a --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/README @@ -0,0 +1,27 @@ +# $Id$ + +This test can be pretty complicated to run, a sample configuration file is +included to help you startup. + + The basic idea to remember is that a "Federation" is a logical +grouping of incoming and outgoing mcast addresses that share the same +multicast port. The test reads the configuration file to find out +which Federations are globally available defined, but the comand line +is used to specify the list of federations that are actually running +on each process. + The test requires the Naming Serivice: + +$ ../../Naming_Service/Naming_Service + + this is an *extremely* silent service, don't be surprized if +you see no output. + + Then you ough to run at least two copies of the test, on +different windows (to make the test interesting): + +$ ./EC_Mcast -l ECM1 -p ECM1.pid -c sample.cfg -n 200 -t 500000 -f Set01 +$ ./EC_Mcast -l ECM2 -p ECM2.pid -c sample.cfg -n 200 -t 500000 -f Set02 + + the test will report the number of events received, if you run +just one of the processes you will notice that this number is smaller, +this indicates that some events come from a "remote" event. diff --git a/TAO/orbsvcs/tests/EC_Mcast/sample.cfg b/TAO/orbsvcs/tests/EC_Mcast/sample.cfg new file mode 100644 index 00000000000..060977e294f --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Mcast/sample.cfg @@ -0,0 +1,64 @@ +6 +Set00 12000 5 3 + 224.100.0.1 + 224.100.0.2 + 224.100.0.3 + 224.100.0.4 + 224.100.0.5 + 224.100.5.1 + 224.100.5.2 + 224.100.5.3 +Set01 12001 4 4 + 224.100.1.1 + 224.100.1.2 + 224.100.1.3 + 224.100.1.4 + 224.100.2.1 + 224.100.2.2 + 224.100.2.4 + 224.100.0.1 +Set02 12002 3 5 + 224.100.2.1 + 224.100.2.2 + 224.100.2.3 + 224.100.2.1 + 224.100.2.2 + 224.100.2.3 + 224.100.1.1 + 224.100.1.3 +Set03 12003 4 4 + 224.100.3.1 + 224.100.3.2 + 224.100.3.3 + 224.100.3.4 + 224.100.2.1 + 224.100.2.2 + 224.100.2.3 + 224.100.2.4 +Set04 12004 4 4 + 224.100.4.1 + 224.100.4.2 + 224.100.4.3 + 224.100.4.4 + 224.100.5.1 + 224.100.5.2 + 224.100.4.3 + 224.100.4.4 +Set05 12005 4 4 + 224.100.5.1 + 224.100.5.2 + 224.100.5.3 + 224.100.5.4 + 224.100.4.1 + 224.100.4.2 + 224.100.5.3 + 224.100.5.4 +Set06 12006 4 4 + 224.100.0.1 + 224.100.0.2 + 224.100.2.1 + 224.100.2.2 + 224.100.0.3 + 224.100.0.4 + 224.100.2.3 + 224.100.2.4 diff --git a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp index e263e080476..5ceddced4a0 100644 --- a/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp +++ b/TAO/orbsvcs/tests/EC_Multiple/EC_Multiple.cpp @@ -119,9 +119,6 @@ print_priority_info (const char *const name) #endif /* ACE_HAS_PTHREADS_STD */ } - - - int Test_ECG::run (int argc, char* argv[]) { @@ -926,16 +923,16 @@ Test_ECG::push_consumer (void *consumer_cookie, { const RtecEventComm::Event& e = events[i]; - if (e.type_ == ACE_ES_EVENT_SHUTDOWN) + if (e.header.type == ACE_ES_EVENT_SHUTDOWN) { this->shutdown_consumer (ID); continue; } ACE_hrtime_t s; - ORBSVCS_Time::TimeT_to_hrtime (s, e.creation_time_); + ORBSVCS_Time::TimeT_to_hrtime (s, e.header.creation_time); ACE_hrtime_t nsec = arrival - s; - if (this->local_source (e.source_)) + if (this->local_source (e.header.source)) { int& count = this->stats_[ID].lcl_count_; @@ -1004,16 +1001,14 @@ Test_ECG::shutdown_supplier (void* /* supplier_cookie */, shutdown.length (1); RtecEventComm::Event& s = shutdown[0]; - s.source_ = 0; - s.ttl_ = 1; + s.header.source = 0; + s.header.ttl = 1; ACE_hrtime_t t = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t); - s.ec_recv_time_ = ORBSVCS_Time::zero; - s.ec_send_time_ = ORBSVCS_Time::zero; - s.data_.x = 0; - s.data_.y = 0; - s.type_ = ACE_ES_EVENT_SHUTDOWN; + ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t); + s.header.ec_recv_time = ORBSVCS_Time::zero; + s.header.ec_send_time = ORBSVCS_Time::zero; + s.header.type = ACE_ES_EVENT_SHUTDOWN; consumer->push (shutdown, _env); } } @@ -1483,22 +1478,22 @@ Test_Supplier::push (const RtecEventComm::EventSet& events, for (u_int i = 0; i < events.length (); ++i) { const RtecEventComm::Event& e = events[i]; - if (e.type_ != ACE_ES_EVENT_INTERVAL_TIMEOUT) + if (e.header.type != ACE_ES_EVENT_INTERVAL_TIMEOUT) continue; // ACE_DEBUG ((LM_DEBUG, "Test_Supplier - timeout (%t)\n")); RtecEventComm::Event& s = sent[i]; - s.source_ = this->supplier_id_; - s.ttl_ = 1; + s.header.source = this->supplier_id_; + s.header.ttl = 1; ACE_hrtime_t t = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (s.creation_time_, t); - s.ec_recv_time_ = ORBSVCS_Time::zero; - s.ec_send_time_ = ORBSVCS_Time::zero; + ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t); + s.header.ec_recv_time = ORBSVCS_Time::zero; + s.header.ec_send_time = ORBSVCS_Time::zero; - s.data_.x = 0; - s.data_.y = 0; + s.data.x = 0; + s.data.y = 0; this->message_count_--; @@ -1513,11 +1508,11 @@ Test_Supplier::push (const RtecEventComm::EventSet& events, if (this->message_count_ % 2 == 0) { // Generate an A event... - s.type_ = this->event_a_; + s.header.type = this->event_a_; } else { - s.type_ = this->event_b_; + s.header.type = this->event_b_; } } this->test_->push_supplier (this->cookie_, diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp index 4b2abf3637e..f94473fc6fa 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Consumer.cpp @@ -213,7 +213,7 @@ Driver::push_consumer (void* consumer_cookie, { const RtecEventComm::Event& e = events[i]; - if (e.data_.payload.mb () == 0) + if (e.data.payload.mb () == 0) { ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i)); continue; diff --git a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp index 014fcff7adb..b3600dc9363 100644 --- a/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp +++ b/TAO/orbsvcs/tests/EC_Throughput/ECT_Supplier.cpp @@ -199,28 +199,28 @@ ECTS_Driver::supplier_task (Test_Supplier *supplier, { RtecEventComm::EventSet event (1); event.length (1); - event[0].source_ = supplier->supplier_id (); - event[0].ttl_ = 1; + event[0].header.source = supplier->supplier_id (); + event[0].header.ttl = 1; ACE_hrtime_t t = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (event[0].creation_time_, t); - event[0].ec_recv_time_ = ORBSVCS_Time::zero; - event[0].ec_send_time_ = ORBSVCS_Time::zero; + ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t); + event[0].header.ec_recv_time = ORBSVCS_Time::zero; + event[0].header.ec_send_time = ORBSVCS_Time::zero; if (i == ACE_static_cast (CORBA::Long, this->event_count_) - 1) - event[0].type_ = ACE_ES_EVENT_SHUTDOWN; + event[0].header.type = ACE_ES_EVENT_SHUTDOWN; else if (i % 2 == 0) - event[0].type_ = this->event_a_; + event[0].header.type = this->event_a_; else - event[0].type_ = this->event_b_; + event[0].header.type = this->event_b_; - event[0].data_.x = 0; - event[0].data_.y = 0; + event[0].data.x = 0; + event[0].data.y = 0; // We use replace to minimize the copies, this should result // in just one memory allocation; - event[0].data_.payload.replace (this->event_size_, - &mb); + event[0].data.payload.replace (this->event_size_, + &mb); supplier->consumer_proxy ()->push(event, TAO_TRY_ENV); TAO_CHECK_ENV; diff --git a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp index 612f0b7ae1e..b1c5c1c8061 100644 --- a/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp +++ b/TAO/orbsvcs/tests/Event_Latency/Event_Latency.cpp @@ -189,7 +189,7 @@ Latency_Consumer::push (const RtecEventComm::EventSet &events, for (CORBA::ULong i = 0; i < events.length (); ++i) { - if (events[i].type_ == ACE_ES_EVENT_SHUTDOWN) + if (events[i].header.type == ACE_ES_EVENT_SHUTDOWN) { ACE_DEBUG ((LM_DEBUG, "Latency Consumer: received shutdown event\n")); @@ -201,15 +201,15 @@ Latency_Consumer::push (const RtecEventComm::EventSet &events, { ACE_hrtime_t creation; ORBSVCS_Time::TimeT_to_hrtime (creation, - events[i].creation_time_); + events[i].header.creation_time); ACE_hrtime_t ec_recv; ORBSVCS_Time::TimeT_to_hrtime (ec_recv, - events[i].ec_recv_time_); + events[i].header.ec_recv_time); ACE_hrtime_t ec_send; ORBSVCS_Time::TimeT_to_hrtime (ec_send, - events[i].ec_send_time_); + events[i].header.ec_send_time); const ACE_hrtime_t now = ACE_OS::gethrtime (); const ACE_hrtime_t elapsed = now - creation; @@ -553,24 +553,25 @@ Latency_Supplier::push (const RtecEventComm::EventSet &events, for (CORBA::ULong i = 0; i < events.length (); ++i) { - if (!master_ && events[i].type_ == ACE_ES_EVENT_SHUTDOWN) + if (!master_ && events[i].header.type == ACE_ES_EVENT_SHUTDOWN) { ACE_DEBUG ((LM_DEBUG, "Latency Supplier: received shutdown event\n")); this->shutdown (); } - else if (events[i].type_ == ACE_ES_EVENT_INTERVAL_TIMEOUT) + else if (events[i].header.type == ACE_ES_EVENT_INTERVAL_TIMEOUT) { // Create the event to send. RtecEventComm::Event event; - event.source_ = supplier_id_; - event.type_ = ACE_ES_EVENT_NOTIFICATION; + event.header.source = this->supplier_id_; + event.header.type = ACE_ES_EVENT_NOTIFICATION; ++total_sent_; - if (timestamp_) + if (this->timestamp_) { ACE_hrtime_t now = ACE_OS::gethrtime (); - ORBSVCS_Time::hrtime_to_TimeT (event.creation_time_, now); + ORBSVCS_Time::hrtime_to_TimeT (event.header.creation_time, + now); } // @@ ACE_TIMEPROBE_RESET; @@ -665,8 +666,8 @@ Latency_Supplier::shutdown (void) { // Create the shutdown message. RtecEventComm::Event event; - event.source_ = supplier_id_; - event.type_ = ACE_ES_EVENT_SHUTDOWN; + event.header.source = this->supplier_id_; + event.header.type = ACE_ES_EVENT_SHUTDOWN; // Push the shutdown event. RtecEventComm::EventSet events (1); diff --git a/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf b/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf index ac7727fb103..3d0227a9192 100644 --- a/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf +++ b/TAO/tests/Cubit/TAO/IDL_Cubit/svc.conf @@ -5,4 +5,4 @@ dynamic Resource_Factory Service_Object * TAO:_make_TAO_Resource_Factory() "-ORBresources tss -ORBreactorlock null" dynamic Client_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Client_Strategy_Factory() "-ORBiiopprofilelock null" -dynamic Server_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Server_Strategy_Factory() "-ORBconcurrency thread-per-connection -ORBpoalock null -ORBconnectorlock null" +dynamic Server_Strategy_Factory Service_Object * TAO:_make_TAO_Default_Server_Strategy_Factory() "-ORBconcurrency reactive -ORBpoalock null -ORBconnectorlock null" |