summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp512
1 files changed, 0 insertions, 512 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp b/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp
deleted file mode 100644
index bcc4f71458d..00000000000
--- a/TAO/orbsvcs/tests/AVStreams/Component_Switching/distributer.cpp
+++ /dev/null
@@ -1,512 +0,0 @@
-// $Id$
-
-#include "distributer.h"
-#include "tao/debug.h"
-#include "ace/Get_Opt.h"
-#include "orbsvcs/AV/Protocol_Factory.h"
-#include "orbsvcs/AV/FlowSpec_Entry.h"
-
-#include "tao/Strategies/advanced_resource.h"
-
-typedef ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> DISTRIBUTER;
-
-// constructor.
-Signal_Handler::Signal_Handler (void)
-{
-}
-
-int
-Signal_Handler::handle_signal (int signum, siginfo_t *, ucontext_t*)
-{
- if (signum == SIGINT)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "In the signal handler\n"));
-
- DISTRIBUTER::instance ()->done (1);
-
- }
- return 0;
-}
-
-int
-Distributer_Sender_StreamEndPoint::get_callback (const char *flow_name,
- TAO_AV_Callback *&callback)
-{
- /// Create and return the sender application callback to AVStreams
- /// for further upcalls.
- callback = &this->callback_;
-
- ACE_CString fname = flow_name;
-
- this->callback_.flowname (fname);
-
- return 0;
-}
-
-int
-Distributer_Sender_StreamEndPoint::set_protocol_object (const char *flowname,
- TAO_AV_Protocol_Object *object)
-{
- Connection_Manager &connection_manager =
- DISTRIBUTER::instance ()->connection_manager ();
-
- /// Add to the map of protocol objects.
- connection_manager.protocol_objects ().bind (flowname,
- object);
-
- /// Store the related streamctrl.
- connection_manager.add_streamctrl (flowname,
- this);
-
- return 0;
-}
-
-int
-Distributer_Receiver_StreamEndPoint::get_callback (const char *flow_name,
- TAO_AV_Callback *&callback)
-{
- /// Create and return the receiver application callback to AVStreams
- /// for further upcalls.
- callback = &this->callback_;
-
- ACE_CString flowname (flow_name);
- this->callback_.flowname (flowname);
-
- return 0;
-}
-
-int
-Distributer_Receiver_StreamEndPoint::set_protocol_object (const char *,
- TAO_AV_Protocol_Object *)
-{
- /// Increment the stream count.
- DISTRIBUTER::instance ()->stream_created ();
-
- return 0;
-}
-
-CORBA::Boolean
-Distributer_Receiver_StreamEndPoint::handle_connection_requested (AVStreams::flowSpec &flowspec
- ACE_ENV_ARG_DECL_NOT_USED)
-{
- //if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Distributer_Receiver_StreamEndPoint::handle_connection_requested\n"));
-
- Connection_Manager &connection_manager =
- DISTRIBUTER::instance ()->connection_manager ();
-
- /// Check to see if the flow already exists. If it does then close the
- /// old connection and setup a new one with the new sender.
-
- for (CORBA::ULong i = 0;
- i < flowspec.length ();
- i++)
- {
- TAO_Forward_FlowSpec_Entry entry;
- entry.parse (flowspec[i]);
-
- //if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- "Handle Conection Requested flowname %s \n",
- entry.flowname ()));
-
- ACE_CString flowname (entry.flowname ());
-
- int result =
- connection_manager.streamctrls ().find (flowname);
-
- /// If the flowname is found.
- if (result == 0)
- {
- ACE_DEBUG ((LM_DEBUG, "\nDistributer switching senders handle connection requested\n\n"));
-
- ///Destroy old stream with the same flowname.
- connection_manager.destroy (flowname);
-
- }
-
- /// Store the related streamctrl.
- connection_manager.add_streamctrl (flowname.c_str (),
- this);
-
- }
- return 1;
-
-}
-
-
-Distributer_Receiver_Callback::Distributer_Receiver_Callback (void)
- : frame_count_ (1)
-{
-}
-
-ACE_CString &
-Distributer_Receiver_Callback::flowname (void)
-{
- return this->flowname_;
-}
-
-void
-Distributer_Receiver_Callback::flowname (const ACE_CString &flowname)
-{
- this->flowname_ = flowname;
-}
-
-
-int
-Distributer_Receiver_Callback::receive_frame (ACE_Message_Block *frame,
- TAO_AV_frame_info *,
- const ACE_Addr &)
-{
- /// Upcall from the AVStreams when there is data to be received from
- /// the sender.
- ACE_DEBUG ((LM_DEBUG,
- "Distributer_Callback::receive_frame for frame %d\n",
- this->frame_count_++));
-
- Connection_Manager::Protocol_Objects &protocol_objects =
- DISTRIBUTER::instance ()->connection_manager ().protocol_objects ();
-
- /// Send frame to all receivers.
- for (Connection_Manager::Protocol_Objects::iterator iterator = protocol_objects.begin ();
- iterator != protocol_objects.end ();
- ++iterator)
- {
- int result =
- (*iterator).int_id_->send_frame (frame);
-
- if (result < 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "send failed:%p",
- "Sender::pace_data send\n"),
- -1);
- }
-
- return 0;
-}
-
-int
-Distributer_Receiver_Callback::handle_destroy (void)
-{
- /// Called when the sender requests the stream to be shutdown.
- ACE_DEBUG ((LM_DEBUG,
- "Distributer_Receiver_Callback::end_stream\n"));
-
- DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
-
- /// Decrement the stream count.
- DISTRIBUTER::instance ()->stream_destroyed ();
-
- return 0;
-}
-
-ACE_CString &
-Distributer_Sender_Callback::flowname (void)
-{
- return this->flowname_;
-}
-
-void
-Distributer_Sender_Callback::flowname (const ACE_CString &flowname)
-{
- this->flowname_ = flowname;
-}
-
-int
-Distributer_Sender_Callback::handle_destroy (void)
-{
- /// Called when the sender requests the stream to be shutdown.
-
- ACE_DEBUG ((LM_DEBUG,
- "Distributer_Sender_Callback::end_stream\n"));
-
- DISTRIBUTER::instance ()->connection_manager ().protocol_objects ().unbind (this->flowname_.c_str ());
-
- DISTRIBUTER::instance ()->connection_manager ().streamctrls ().unbind (this->flowname_.c_str ());
-
- DISTRIBUTER::instance ()->connection_manager ().receivers ().unbind (this->flowname_.c_str ());
-
- return 0;
-}
-
-Distributer::Distributer (void)
- : sender_name_ ("sender"),
- distributer_name_ ("distributer"),
- done_ (0),
- stream_count_ (0)
-{
-}
-
-Distributer::~Distributer (void)
-{
-}
-
-void
-Distributer::stream_created (void)
-{
- this->stream_count_++;
-}
-
-void
-Distributer::stream_destroyed (void)
-{
- this->stream_count_--;
-
- if (this->stream_count_ == 0)
- this->done_ = 1;
-}
-
-
-Connection_Manager &
-Distributer::connection_manager (void)
-{
- return this->connection_manager_;
-}
-
-int
-Distributer::parse_args (int argc,
- char **argv)
-{
- /// Parse command line arguments
- ACE_Get_Opt opts (argc, argv, "s:r:");
-
- int c;
- while ((c= opts ()) != -1)
- {
- switch (c)
- {
- case 's':
- this->sender_name_ = opts.opt_arg ();
- break;
- case 'r':
- this->distributer_name_ = opts.opt_arg ();
- break;
- default:
- ACE_DEBUG ((LM_DEBUG,"Unknown Option\n"));
- return -1;
- }
- }
- return 0;
-}
-
-
-int
-Distributer::init (int argc,
- char ** argv
- ACE_ENV_ARG_DECL)
-{
- /// Initialize the connection class.
- int result =
- this->connection_manager_.init (TAO_AV_CORE::instance ()->orb ());
- if (result != 0)
- return result;
-
- /// Initialize the endpoint strategy with the orb and poa.
- result =
- this->sender_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
- TAO_AV_CORE::instance ()->poa ());
- if (result != 0)
- return result;
-
- result =
- this->receiver_endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
- TAO_AV_CORE::instance ()->poa ());
- if (result != 0)
- return result;
-
- /// Parse the command line arguments
- result =
- this->parse_args (argc,
- argv);
- if (result != 0)
- return result;
-
- ACE_Reactor *reactor =
- TAO_AV_CORE::instance ()->reactor ();
-
- if (reactor->register_handler (SIGINT,
- &this->signal_handler_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error in handler register\n"),
- -1);
- /// Register the signal handler for clean termination of the process.
-
- ACE_NEW_RETURN (this->distributer_sender_mmdevice_,
- TAO_MMDevice (&this->sender_endpoint_strategy_),
- -1);
-
- /// Servant Reference Counting to manage lifetime
- PortableServer::ServantBase_var safe_sender_mmdevice =
- this->distributer_sender_mmdevice_;
-
- AVStreams::MMDevice_var distributer_sender_mmdevice =
- this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- ACE_NEW_RETURN (this->distributer_receiver_mmdevice_,
- TAO_MMDevice (&this->receiver_endpoint_strategy_),
- -1);
-
- /// Servant Reference Counting to manage lifetime
- PortableServer::ServantBase_var safe_receiver_mmdevice =
- this->distributer_receiver_mmdevice_;
-
- AVStreams::MMDevice_var distributer_receiver_mmdevice =
- this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
-
- /// Bind to sender.
- this->connection_manager_.bind_to_sender (this->sender_name_,
- this->distributer_name_,
- distributer_receiver_mmdevice.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- /// Connect to sender.
- this->connection_manager_.connect_to_sender (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- /// Bind to receivers.
- this->connection_manager_.bind_to_receivers (this->distributer_name_,
- distributer_sender_mmdevice.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- /// Connect to receivers
- this->connection_manager_.connect_to_receivers (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK_RETURN (-1);
-
- return 0;
-}
-
-int
-Distributer::done (void) const
-{
- return this->done_;
-}
-
-void
-Distributer::shut_down (ACE_ENV_SINGLE_ARG_DECL)
-{
- ACE_TRY
- {
- AVStreams::MMDevice_var receiver_mmdevice =
- this->distributer_receiver_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- DISTRIBUTER::instance ()->connection_manager ().unbind_receiver (this->sender_name_,
- this->distributer_name_,
- receiver_mmdevice.in ());
- AVStreams::MMDevice_var sender_mmdevice =
- this->distributer_sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- DISTRIBUTER::instance ()->connection_manager ().unbind_sender (this->distributer_name_,
- sender_mmdevice.in ());
-
- // DISTRIBUTER::instance ()->connection_manager ().destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
- // ACE_TRY_CHECK;
-
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"Distributer::shut_down");
- }
- ACE_ENDTRY;
-}
-
-void
-Distributer::done (int done)
-{
- this->done_ = done;
-}
-
-int
-main (int argc,
- char **argv)
-{
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
- {
- /// Initialize the ORB first.
- CORBA::ORB_var orb =
- CORBA::ORB_init (argc,
- argv,
- 0
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- CORBA::Object_var obj
- = orb->resolve_initial_references ("RootPOA"
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- /// Get the POA_var object from Object_var.
- PortableServer::POA_var root_poa =
- PortableServer::POA::_narrow (obj.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- PortableServer::POAManager_var mgr
- = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- /// Initialize the AVStreams components.
- TAO_AV_CORE::instance ()->init (orb.in (),
- root_poa.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- /// Initialize the Distributer
- int result =
- DISTRIBUTER::instance ()->init (argc,
- argv
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- if (result != 0)
- return result;
-
- while (!DISTRIBUTER::instance ()->done ())
- {
- CORBA::Boolean wp = orb->work_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- if (wp)
- {
- orb->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
-
- ACE_TRY_CHECK;
- }
- }
-
- DISTRIBUTER::instance ()->shut_down (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
-// orb->shutdown(1 ACE_ENV_ARG_PARAMETER);
-// ACE_TRY_CHECK;
-
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"main");
- return -1;
- }
- ACE_ENDTRY;
- ACE_CHECK_RETURN (-1);
-
- DISTRIBUTER::close (); // Explicitly finalize the Unmanaged_Singleton.
-
- return 0;
-}
-
-#if defined (ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION)
-template ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex> *ACE_Unmanaged_Singleton<Distributer, ACE_Null_Mutex>::singleton_;
-#endif /* ACE_HAS_EXPLICIT_STATIC_TEMPLATE_MEMBER_INSTANTIATION */