diff options
author | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-10-21 21:41:34 +0000 |
---|---|---|
committer | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1996-10-21 21:41:34 +0000 |
commit | a5fdebc5f6375078ec1763850a4ca23ec7fe6458 (patch) | |
tree | bcf0a25c3d45a209a6e3ac37b233a4812f29c732 /examples/ASX | |
download | ATCD-a5fdebc5f6375078ec1763850a4ca23ec7fe6458.tar.gz |
Initial revision
Diffstat (limited to 'examples/ASX')
40 files changed, 5308 insertions, 0 deletions
diff --git a/examples/ASX/CCM_App/CCM_App.cpp b/examples/ASX/CCM_App/CCM_App.cpp new file mode 100644 index 00000000000..218b3037be2 --- /dev/null +++ b/examples/ASX/CCM_App/CCM_App.cpp @@ -0,0 +1,123 @@ +#define ACE_BUILD_SVC_DLL +// @(#)CCM_App.cpp 1.1 10/18/96 + +#include "ace/Stream.h" +#include "ace/Task.h" +#include "ace/Module.h" + +typedef ACE_Task<ACE_SYNCH> MT_Task; +typedef ACE_Stream<ACE_SYNCH> MT_Stream; +typedef ACE_Module<ACE_SYNCH> MT_Module; + +class ACE_Svc_Export Test_Task : public MT_Task +{ +public: + virtual int open (void *); + virtual int close (u_long); + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + virtual int svc (void); + virtual int info (char **, size_t) const; + virtual int init (int, char *[]); + virtual int fini (void); + virtual int suspend (void); + virtual int resume (void); +}; + +int +Test_Task::open (void *) +{ + ACE_DEBUG ((LM_DEBUG, "opening %s\n", this->name () ? this->name () : "task")); + return 0; +} + +int +Test_Task::close (u_long) +{ + ACE_DEBUG ((LM_DEBUG, "closing %s\n", this->name () ? this->name () : "task")); + return 0; +} + +int +Test_Task::suspend (void) +{ + ACE_DEBUG ((LM_DEBUG, "suspending in %s\n", this->name () ? this->name () : "task")); + return 0; +} + +int +Test_Task::resume (void) +{ + ACE_DEBUG ((LM_DEBUG, "resuming in %s\n", this->name () ? this->name () : "task")); + return 0; +} + +int +Test_Task::put (ACE_Message_Block *, ACE_Time_Value *) +{ + return 0; +} + +int +Test_Task::svc (void) +{ + return 0; +} + +int +Test_Task::info (char **, size_t) const +{ + return 0; +} + +int +Test_Task::init (int, char *[]) +{ + ACE_DEBUG ((LM_DEBUG, "initializing %s\n", this->name () ? this->name () : "task")); + + return 0; +} + +int +Test_Task::fini (void) +{ + ACE_DEBUG ((LM_DEBUG, "finalizing %s\n", this->name () ? this->name () : "task")); + return 0; +} + +// Dynamically linked functions used to control configuration. + +extern "C" ACE_Svc_Export MT_Stream *make_stream (void); +extern "C" ACE_Svc_Export MT_Module *make_da (void); +extern "C" ACE_Svc_Export MT_Module *make_ea (void); +extern "C" ACE_Svc_Export MT_Module *make_mr (void); +extern "C" ACE_Svc_Export ACE_Service_Object *make_task (void); + +ACE_Service_Object * +make_task (void) +{ + return new Test_Task; +} + +MT_Stream * +make_stream (void) +{ + return new MT_Stream; +} + +MT_Module * +make_da (void) +{ + return new MT_Module ("Device_Adapter", new Test_Task, new Test_Task); +} + +MT_Module * +make_ea (void) +{ + return new MT_Module ("Event_Analyzer", new Test_Task, new Test_Task); +} + +MT_Module * +make_mr (void) +{ + return new MT_Module ("Multicast_Router", new Test_Task, new Test_Task); +} diff --git a/examples/ASX/CCM_App/Makefile b/examples/ASX/CCM_App/Makefile new file mode 100644 index 00000000000..3902aef1235 --- /dev/null +++ b/examples/ASX/CCM_App/Makefile @@ -0,0 +1,176 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for CCM tests +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = SC_Client \ + SC_Server + +LSRC = $(addsuffix .cpp,$(BIN)) \ + CCM_App.cpp + +VLDLIBS = $(LDLIBS:%=%$(VAR)) + +BUILD = $(VSHLIB) $(SHLIBA) $(VBIN) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/SC_Client.o .shobj/SC_Client.so: SC_Client.cpp +.obj/SC_Server.o .shobj/SC_Server.so: SC_Server.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h +.obj/CCM_App.o .shobj/CCM_App.so: CCM_App.cpp \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/ASX/CCM_App/SC_Client.cpp b/examples/ASX/CCM_App/SC_Client.cpp new file mode 100644 index 00000000000..498ff89ba9b --- /dev/null +++ b/examples/ASX/CCM_App/SC_Client.cpp @@ -0,0 +1,9 @@ +// Pretty simple, eh? ;-) +// @(#)SC_Client.cpp 1.1 10/18/96 + + +int +main (int, char *[]) +{ + return 0; +} diff --git a/examples/ASX/CCM_App/SC_Server.cpp b/examples/ASX/CCM_App/SC_Server.cpp new file mode 100644 index 00000000000..49a11179dcd --- /dev/null +++ b/examples/ASX/CCM_App/SC_Server.cpp @@ -0,0 +1,63 @@ +// Simple driver program for the server. +// @(#)SC_Server.cpp 1.1 10/18/96 + + +#include "ace/Service_Config.h" +#include "ace/Synch.h" +#include "ace/Signal.h" + +class Event_Handler : public ACE_Event_Handler +{ +public: + virtual int handle_input (ACE_HANDLE handle); + virtual int handle_close (ACE_HANDLE, + ACE_Reactor_Mask); +}; + +int +Event_Handler::handle_input (ACE_HANDLE handle) +{ + char buf[BUFSIZ]; + + ssize_t n = ACE_OS::read (handle, buf, sizeof buf); + + if (n == -1) + return -1; + else if (ACE_OS::write (ACE_STDOUT, buf, n) != n) + return -1; + else + return 0; +} + +int +Event_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, "closing Event_Handler\n")); + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config loggerd; + Event_Handler handler; + ACE_Sig_Adapter shutdown_handler ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop); + + if (ACE::register_stdin_handler (&handler, + ACE_Service_Config::reactor (), + ACE_Service_Config::thr_mgr ()) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler")); + + if (loggerd.open (argc, argv) == -1 && errno != ENOENT) + ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1)); + + else if (ACE_Service_Config::reactor ()->register_handler + (SIGINT, &shutdown_handler) == -1) + ACE_ERROR ((LM_ERROR, "%p\n%a", "register_handler", 1)); + + // Perform logging service until we receive SIGINT. + + loggerd.run_reactor_event_loop (); + + return 0; +} diff --git a/examples/ASX/CCM_App/svc.conf b/examples/ASX/CCM_App/svc.conf new file mode 100644 index 00000000000..4737cc6312e --- /dev/null +++ b/examples/ASX/CCM_App/svc.conf @@ -0,0 +1,21 @@ +static ACE_Service_Manager "-d -p 3911" + +dynamic My_Task Service_Object *.shobj/CCM_App.so:make_task() "-p 3000" + +stream dynamic CCM_App STREAM *.shobj/CCM_App.so:make_stream() active +{ + dynamic Device_Adapter Module *.shobj/CCM_App.so:make_da() + dynamic Event_Analyzer Module *.shobj/CCM_App.so:make_ea() + dynamic Multicast_Router Module *.shobj/CCM_App.so:make_mr() "-p 3001" +} + +stream CCM_App +{ + remove Device_Adapter + remove Event_Analyzer + remove Multicast_Router +} + +remove CCM_App +remove My_Task + diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp new file mode 100644 index 00000000000..aad0adf313e --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.cpp @@ -0,0 +1,132 @@ +#include "ace/Log_Msg.h" +// @(#)Consumer_Router.cpp 1.1 10/18/96 + +#include "Consumer_Router.h" +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY; + +int +Consumer_Handler::open (void *a) +{ + CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a; + this->router_task_ = af->router (); + return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a); +} + +Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm) + : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm) +{ +} + +// Create a new handler that will interact with a consumer and point +// its ROUTER_TASK_ data member to the CONSUMER_ROUTER. + +Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm) + : CONSUMER_ROUTER (tm) +{ +} + +// Initialize the Router. + +int +Consumer_Router::open (void *) +{ + assert (this->is_reader ()); + + char *argv[4]; + + argv[0] = (char *) this->name (); + argv[1] = "-p"; + argv[2] = options.consumer_port (); + argv[3] = 0; + + if (this->init (2, &argv[1]) == -1) + return -1; + + // Make this an active object. + return this->activate (options.t_flags ()); +} + +int +Consumer_Router::close (u_long) +{ + assert (this->is_reader ()); + ACE_DEBUG ((LM_DEBUG, "(%t) closing Consumer_Router\n")); + this->peer_map_.close (); + + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + return 0; +} + +// Handle incoming messages in a separate thread. + +int +Consumer_Router::svc (void) +{ + ACE_Thread_Control tc (this->thr_mgr ()); + ACE_Message_Block *mb = 0; + + assert (this->is_reader ()); + + ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Consumer_Router\n")); + + while (this->getq (mb) >= 0) + { + ACE_DEBUG ((LM_DEBUG, "Consumer_Router is routing via send_peers\n")); + if (this->send_peers (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) send_peers failed in Consumer_Router\n"), + -1); + } + ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Consumer_Router\n")); + return 0; + // Note the implicit ACE_OS::thr_exit() via destructor. +} + +// Send a MESSAGE_BLOCK to the supplier(s). + +int +Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + assert (this->is_reader ()); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + else + // Queue up the message, which will be processed by + // Consumer_Router::svc(). + return this->putq (mb); +} + +// Return information about the Client_Router ACE_Module. + +int +Consumer_Router::info (char **strp, size_t length) const +{ + char buf[BUFSIZ]; + ACE_INET_Addr addr; + const char *mod_name = this->name (); + ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor (); + + if (sa.get_local_addr (addr) == -1) + return -1; + + ACE_OS::sprintf (buf, "%s\t %d/%s %s", + mod_name, addr.get_port_number (), "tcp", + "# consumer router\n"); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Consumer_Router.h b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h new file mode 100644 index 00000000000..efc5acf9e3d --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Consumer_Router.h @@ -0,0 +1,46 @@ +/* -*- C++ -*- */ +// @(#)Consumer_Router.h 1.1 10/18/96 + +/* The interface between one or more consumers and an Event Server ACE_Stream */ + +#if !defined (_CONSUMER_ROUTER_H) +#define _CONSUMER_ROUTER_H + +#include "ace/Thread_Manager.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +class Consumer_Handler; /* Forward declaration... */ + +typedef ACE_HANDLE CONSUMER_KEY; + +typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER; + +class Consumer_Handler : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> +{ +public: + Consumer_Handler (ACE_Thread_Manager *tm = 0); + virtual int open (void *); +}; + +class Consumer_Router : public CONSUMER_ROUTER +{ +public: + Consumer_Router (ACE_Thread_Manager *thr_manager); + +protected: + /* ACE_Task hooks. */ + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void); + + /* Dynamic linking hooks */ + virtual int info (char **info_string, size_t length) const; +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _CONSUMER_ROUTER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp new file mode 100644 index 00000000000..977e5c4af9d --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.cpp @@ -0,0 +1,68 @@ +#include "Event_Analyzer.h" +// @(#)Event_Analyzer.cpp 1.1 10/18/96 + + +#if defined (ACE_HAS_THREADS) + +int +Event_Analyzer::open (void *) +{ + return 0; +} + +int +Event_Analyzer::close (u_long) +{ + return 0; +} + +int +Event_Analyzer::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd; + + switch (cmd = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ()); + break; + } + return 0; +} + +int +Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + this->control (mb); + + return this->put_next (mb); +} + +int +Event_Analyzer::init (int, char *[]) +{ + return 0; +} + +int +Event_Analyzer::fini (void) +{ + return 0; +} + +int +Event_Analyzer::info (char **strp, size_t length) const +{ + const char *mod_name = this->name (); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h new file mode 100644 index 00000000000..9d38611e7aa --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h @@ -0,0 +1,34 @@ +/* -*- C++ -*- */ +// @(#)Event_Analyzer.h 1.1 10/18/96 + +/* Signal router */ + +#if !defined (_EVENT_ANALYZER_H) +#define _EVENT_ANALYZER_H + +#include "ace/Stream.h" +#include "ace/Module.h" +#include "ace/Task.h" +#include "ace/Synch.h" + +#if defined (ACE_HAS_THREADS) + +class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH> +{ +public: + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void) { return 0; } + + /* Dynamic linking hooks */ + virtual int init (int argc, char *argv[]); + virtual int fini (void); + virtual int info (char **info_string, size_t length) const; + +private: + virtual int control (ACE_Message_Block *); +}; + +#endif /* ACE_HAS_THREADS */ +#endif /* _EVENT_ANALYZER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Makefile b/examples/ASX/Event_Server/Event_Server/Makefile new file mode 100644 index 00000000000..947f50c5d7a --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Makefile @@ -0,0 +1,433 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for the Event Server test +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = event_server + +FILES = Options \ + Supplier_Router \ + Event_Analyzer \ + Consumer_Router \ + Peer_Router + +LSRC = $(addsuffix .cpp,$(FILES)) +LOBJ = $(addsuffix .o,$(FILES)) +SHOBJ = $(addsuffix .so,$(FILES)) + +LDLIBS = $(addprefix .shobj/,$(SHOBJ)) + +VLDLIBS = $(LDLIBS:%=%$(VAR)) + +BUILD = $(VBIN) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/Options.o .shobj/Options.so: Options.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i +.obj/Supplier_Router.o .shobj/Supplier_Router.so: Supplier_Router.cpp Supplier_Router.h \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + Peer_Router.h \ + $(WRAPPER_ROOT)/ace/Acceptor.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Acceptor.i \ + $(WRAPPER_ROOT)/ace/Acceptor.cpp \ + Peer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i +.obj/Event_Analyzer.o .shobj/Event_Analyzer.so: Event_Analyzer.cpp Event_Analyzer.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i +.obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + Consumer_Router.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i \ + $(WRAPPER_ROOT)/ace/SPIPE.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + Peer_Router.h \ + $(WRAPPER_ROOT)/ace/Acceptor.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Acceptor.i \ + $(WRAPPER_ROOT)/ace/Acceptor.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + Peer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i +.obj/Peer_Router.o .shobj/Peer_Router.so: Peer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i Peer_Router.h \ + $(WRAPPER_ROOT)/ace/Acceptor.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Acceptor.i \ + $(WRAPPER_ROOT)/ace/Acceptor.cpp \ + $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + Peer_Router.cpp + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/ASX/Event_Server/Event_Server/Options.cpp b/examples/ASX/Event_Server/Event_Server/Options.cpp new file mode 100644 index 00000000000..41658639a77 --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Options.cpp @@ -0,0 +1,186 @@ +#include "ace/Get_Opt.h" +// @(#)Options.cpp 1.1 10/18/96 + +#include "ace/Synch.h" +#include "ace/Thread.h" +#include "ace/Log_Msg.h" +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +Options::Options (void) + : debugging_ (0), + verbosity_ (0), + low_water_mark_ (1024), + high_water_mark_ (8 * 1024), + message_size_ (128), + thr_count_ (4), + initial_queue_length_ (0), + iterations_ (100000), + consumer_port_ ("-p " ACE_ITOA (10000)), + supplier_port_ ("-p " ACE_ITOA (10001)), + t_flags_ (THR_DETACHED) +{ +} + +Options::~Options (void) +{ +} + +void Options::print_results (void) +{ +#if !defined (ACE_WIN32) + ACE_Profile_Timer::ACE_Elapsed_Time et; + ACE_Profile_Timer::Rusage rusage; + + this->itimer_.elapsed_time (et); + this->itimer_.get_rusage (rusage); + + if (options.verbose ()) + { +#if defined (ACE_HAS_PRUSAGE_T) + ACE_OS::printf ("final concurrency hint = %d\n", ACE_Thread::getconcurrency ()); + ACE_OS::printf ("%8d = lwpid\n" + "%8d = lwp count\n" + "%8d = minor page faults\n" + "%8d = major page faults\n" + "%8d = input blocks\n" + "%8d = output blocks\n" + "%8d = messages sent\n" + "%8d = messages received\n" + "%8d = signals received\n" + "%8ds, %dms = wait-cpu (latency) time\n" + "%8ds, %dms = user lock wait sleep time\n" + "%8ds, %dms = all other sleep time\n" + "%8d = voluntary context switches\n" + "%8d = involuntary context switches\n" + "%8d = system calls\n" + "%8d = chars read/written\n", + rusage.pr_lwpid, + rusage.pr_count, + rusage.pr_minf, + rusage.pr_majf, + rusage.pr_inblk, + rusage.pr_oublk, + rusage.pr_msnd, + rusage.pr_mrcv, + rusage.pr_sigs, + rusage.pr_wtime.tv_sec, rusage.pr_wtime.tv_nsec / 1000000, + rusage.pr_ltime.tv_sec, rusage.pr_ltime.tv_nsec / 1000000, + rusage.pr_slptime.tv_sec, rusage.pr_slptime.tv_nsec / 1000000, + rusage.pr_vctx, + rusage.pr_ictx, + rusage.pr_sysc, + rusage.pr_ioch); +#else + /* Someone needs to write the corresponding dump for rusage... */ +#endif /* ACE_HAS_PRUSAGE_T */ + } + + ACE_OS::printf ("---------------------\n" + "real time = %.3f\n" + "user time = %.3f\n" + "system time = %.3f\n" + "---------------------\n", + et.real_time, et.user_time, et.system_time); +#endif /* ACE_WIN32 */ +} + +/* Manages the options */ +Options options; + +void +Options::parse_args (int argc, char *argv[]) +{ + ACE_LOG_MSG->open (argv[0]); + + ACE_Get_Opt get_opt (argc, argv, "c:bdH:i:L:l:M:ns:t:T:v"); + int c; + + while ((c = get_opt ()) != -1) + switch (c) + { + case 'b': + this->t_flags (THR_BOUND); + break; + case 'c': + this->consumer_port (get_opt.optarg); + break; + case 'd': + this->debugging_ = 1; + break; + case 'H': + this->high_water_mark (ACE_OS::atoi (get_opt.optarg)); + break; + case 'i': + this->iterations (ACE_OS::atoi (get_opt.optarg)); + break; + case 'L': + this->low_water_mark (ACE_OS::atoi (get_opt.optarg)); + break; + case 'l': + this->initial_queue_length (ACE_OS::atoi (get_opt.optarg)); + break; + case 'M': + this->message_size (ACE_OS::atoi (get_opt.optarg)); + break; + case 'n': + this->t_flags (THR_NEW_LWP); + break; + case 's': + this->supplier_port (get_opt.optarg); + break; + case 'T': + if (ACE_OS::strcasecmp (get_opt.optarg, "ON") == 0) + ACE_Trace::start_tracing (); + else if (ACE_OS::strcasecmp (get_opt.optarg, "OFF") == 0) + ACE_Trace::stop_tracing (); + break; + case 't': + this->thr_count (ACE_OS::atoi (get_opt.optarg)); + break; + case 'v': + this->verbosity_ = 1; + break; + default: + ::fprintf (stderr, "%s\n" + "\t[-b] (THR_BOUND)\n" + "\t[-c consumer port]\n" + "\t[-d] (enable debugging)\n" + "\t[-H high water mark]\n" + "\t[-i number of test iterations]\n" + "\t[-L low water mark]\n" + "\t[-M] message size \n" + "\t[-n] (THR_NEW_LWP)\n" + "\t[-q max queue size]\n" + "\t[-s supplier port]\n" + "\t[-t number of threads]\n" + "\t[-v] (verbose) \n", + argv[0]); + ::exit (1); + /* NOTREACHED */ + break; + } + + if (this->verbose ()) + ACE_OS::printf ("%8d = initial concurrency hint\n" + "%8d = total iterations\n" + "%8d = thread count\n" + "%8d = low water mark\n" + "%8d = high water mark\n" + "%8d = message_size\n" + "%8d = initial queue length\n" + "%8d = THR_BOUND\n" + "%8d = THR_NEW_LWP\n", + ACE_Thread::getconcurrency (), + this->iterations (), + this->thr_count (), + this->low_water_mark (), + this->high_water_mark (), + this->message_size (), + this->initial_queue_length (), + (this->t_flags () & THR_BOUND) != 0, + (this->t_flags () & THR_NEW_LWP) != 0); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Options.h b/examples/ASX/Event_Server/Event_Server/Options.h new file mode 100644 index 00000000000..054f0d834f6 --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Options.h @@ -0,0 +1,75 @@ +/* -*- C++ -*- */ +// @(#)Options.h 1.1 10/18/96 + +/* Option manager for Event Server */ + +#if !defined (DEVICE_OPTIONS_H) +#define DEVICE_OPTIONS_H + +#include "ace/OS.h" +#include "ace/Profile_Timer.h" + +#if defined (ACE_HAS_THREADS) + +class Options +{ +public: + Options (void); + ~Options (void); + void parse_args (int argc, char *argv[]); + + void stop_timer (void); + void start_timer (void); + + void thr_count (size_t count); + size_t thr_count (void); + + void initial_queue_length (size_t length); + size_t initial_queue_length (void); + + void high_water_mark (size_t size); + size_t high_water_mark (void); + + void low_water_mark (size_t size); + size_t low_water_mark (void); + + void message_size (size_t size); + size_t message_size (void); + + void iterations (size_t n); + size_t iterations (void); + + void t_flags (long flag); + long t_flags (void); + + void supplier_port (char *port); + char *supplier_port (void); + + void consumer_port (char *port); + char *consumer_port (void); + + int debug (void); + int verbose (void); + + void print_results (void); + +private: + ACE_Profile_Timer itimer_; /* Time the process */ + size_t thr_count_; /* Number of threads to spawn */ + long t_flags_; /* Flags to thr_create() */ + size_t high_water_mark_; /* ACE_Task high water mark */ + size_t low_water_mark_; /* ACE_Task low water mark */ + size_t message_size_; /* Size of a message */ + size_t initial_queue_length_; /* Initial number of items in the queue */ + size_t iterations_; /* Number of iterations to run the test program */ + int debugging_; /* Extra debugging info */ + int verbosity_; /* Extra verbose messages */ + char *consumer_port_; /* Port that the Consumer_Router is using */ + char *supplier_port_; /* Port that the Supplier_Router is using */ +}; + +extern Options options; + +#include "Options.i" +#endif /* ACE_HAS_THREADS */ +#endif /* DEVICE_OPTIONS_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Options.i b/examples/ASX/Event_Server/Event_Server/Options.i new file mode 100644 index 00000000000..c538b94b46a --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Options.i @@ -0,0 +1,137 @@ +/* -*- C++ -*- */ +// @(#)Options.i 1.1 10/18/96 + +/* Option manager for ustreams */ + +inline void +Options::supplier_port (char *port) +{ + this->supplier_port_ = port; +} + +inline char * +Options::supplier_port (void) +{ + return this->supplier_port_; +} + +inline void +Options::consumer_port (char *port) +{ + this->consumer_port_ = port; +} + +inline char * +Options::consumer_port (void) +{ + return this->consumer_port_; +} + +inline void +Options::start_timer (void) +{ + this->itimer_.start (); +} + +inline void +Options::stop_timer (void) +{ + this->itimer_.stop (); +} + +inline void +Options::thr_count (size_t count) +{ + this->thr_count_ = count; +} + +inline size_t +Options::thr_count (void) +{ + return this->thr_count_; +} + +inline void +Options::initial_queue_length (size_t length) +{ + this->initial_queue_length_ = length; +} + +inline size_t +Options::initial_queue_length (void) +{ + return this->initial_queue_length_; +} + +inline void +Options::high_water_mark (size_t size) +{ + this->high_water_mark_ = size; +} + +inline size_t +Options::high_water_mark (void) +{ + return this->high_water_mark_; +} + +inline void +Options::low_water_mark (size_t size) +{ + this->low_water_mark_ = size; +} + +inline size_t +Options::low_water_mark (void) +{ + return this->low_water_mark_; +} + +inline void +Options::message_size (size_t size) +{ + this->message_size_ = size; +} + +inline size_t +Options::message_size (void) +{ + return this->message_size_; +} + +inline void +Options::iterations (size_t n) +{ + this->iterations_ = n; +} + +inline size_t +Options::iterations (void) +{ + return this->iterations_; +} + +inline void +Options::t_flags (long flag) +{ + this->t_flags_ |= flag; +} + +inline long +Options::t_flags (void) +{ + return this->t_flags_; +} + +inline int +Options::debug (void) +{ + return this->debugging_; +} + +inline int +Options::verbose (void) +{ + return this->verbosity_; +} + diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp new file mode 100644 index 00000000000..f0a77ed7103 --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp @@ -0,0 +1,279 @@ +#if !defined (_PEER_ROUTER_C) +// @(#)Peer_Router.cpp 1.1 10/18/96 + +#define _PEER_ROUTER_C + +#include "ace/Service_Config.h" +#include "ace/Get_Opt.h" +#include "ace/Log_Msg.h" +#include "Options.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +// Define some short-hand macros to deal with verbose templates +// names... + +#define PH PEER_HANDLER +#define PA PEER_ACCEPTOR +#define PAD PEER_ADDR +#define PK PEER_KEY +#define PM PEER_MAP + +template <class PH, class PK> int +Acceptor_Factory<PH, PK>::init (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "dp:", 0); + ACE_INET_Addr addr; + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'p': + addr.set (ACE_OS::atoi (get_opt.optarg)); + break; + case 'd': + break; + default: + break; + } + + if (this->open (addr, ACE_Service_Config::reactor ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); + return 0; +} + +template <class PH, class PK> +Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr) + : pr_ (pr) +{ +} + +template <class PH, class PK> Peer_Router<PH, PK> * +Acceptor_Factory<PH, PK>::router (void) +{ + return this->pr_; +} + +template <class ROUTER, class KEY> +Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm) + : inherited (tm) +{ +} + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::svc (void) +{ +#if 0 + ACE_Thread_Control thread_control (tm); + // Just a try !! we're just reading from our Message_Queue + ACE_Message_Block *db, *hb; + int n; + + // Do an endless loop + for (;;) + { + db = new Message_Block (BUFSIZ); + hb = new Message_Block (sizeof (KEY), Message_Block::MB_PROTO, db); + + if ((n = this->peer_.recv (db->rd_ptr (), db->size ())) == -1) + LM_ERROR_RETURN ((LOG_ERROR, "%p", "recv failed"), -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + LM_ERROR_RETURN ((LOG_ERROR, "%p", "unbind failed"), -1); + LM_DEBUG ((LOG_DEBUG, "(%t) shutting down \n")); + return -1; // We do not need to be deregistered by reactor + // as we were not registered at all + } + else + // Transform incoming buffer into a Message and pass + // downstream. + { + db->wr_ptr (n); + *(long *) hb->rd_ptr () = this->get_handle (); // Structure assignment. + hb->wr_ptr (sizeof (long)); + + if (this->router_task_->reply (hb) == -1) + { + cout << "Peer_Handler.svc : router_task->reply failed" << endl ; + return -1; + } + } + } + return 0; +#else + return -1; +#endif +} + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + return this->peer ().send_n (mb->rd_ptr (), mb->length ()); +} + +// Create a new handler and point its ROUTER_TASK_ data member to the +// corresponding router. Note that this router is extracted out of +// the Acceptor_Factory * that is passed in via the +// ACE_Acceptor::handle_input() method. + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::open (void *a) +{ + char buf[BUFSIZ], *p = buf; + + if (this->router_task_->info (&p, sizeof buf) != -1) + ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n", + buf, this->get_handle (), a)); + else + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); +#if 0 + if (this->activate (options.t_flags ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); +#endif + ACE_DEBUG ((LM_DEBUG, + "Peer_Handler::open registering with Reactor for handle_input\n")); + + if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); + else if (this->router_task_->bind_peer (this->get_handle (), this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1); + else if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "disable non-blocking I/O"), -1); + return 0; +} + +// Receive a message from a supplier. + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h)); + + ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); + ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), + ACE_Message_Block::MB_PROTO, db); + int n; + + if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h)); + return -1; // Instruct the ACE_Reactor to deregister us by returning -1. + } + else // Transform incoming buffer into a Message and pass downstream. + { + db->wr_ptr (n); + *(ACE_HANDLE *) hb->rd_ptr () = this->get_handle (); // structure assignment. + hb->wr_ptr (sizeof (ACE_HANDLE)); + return this->router_task_->reply (hb) == -1 ? -1 : 0; + } +} + +template <class PH, class PK> +Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm) + : ACE_Task<ACE_MT_SYNCH> (tm) +{ +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb) +{ + PEER_ITERATOR map_iter = this->peer_map_; + int bytes = 0; + int iterations = 0; + ACE_Message_Block *data_block = mb->cont (); + + for (ACE_Map_Entry<PK, PH *> *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_)); + + iterations++; + bytes += ss->int_id_->put (data_block); + } + + delete mb; + return bytes == 0 ? 0 : bytes / iterations; +} + +template <class PH, class PK> +Peer_Router<PH, PK>::~Peer_Router (void) +{ +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::fini (void) +{ + delete this->acceptor_; + return 0; +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; + + switch (command = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); + break; + default: + return -1; + } + return 0; +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::unbind_peer (PK key) +{ + return this->peer_map_.unbind (key); +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph) +{ + PH *peer_handler = (PH *) ph; + return this->peer_map_.bind (key, peer_handler); +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::init (int argc, char *argv[]) +{ + this->acceptor_ = new ACCEPTOR (this); + + if (this->acceptor_->init (argc, argv) == -1 + || this->peer_map_.open () == -1) + return -1; + else + { + ACE_INET_Addr addr; + ACE_SOCK_Acceptor &acceptor = this->acceptor_->acceptor(); + + if (acceptor.get_local_addr (addr) != -1) + ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, port = %d, fd = %d, this = %u\n", + this->name (), addr.get_port_number (), + acceptor.get_handle (), this)); + else + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1); + } + return 0; +} + +#undef PH +#undef PA +#undef PAD +#undef PK +#undef PM +#endif /* ACE_HAS_THREADS */ +#endif /* _PEER_ROUTER_C */ diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.h b/examples/ASX/Event_Server/Event_Server/Peer_Router.h new file mode 100644 index 00000000000..5df444a985a --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.h @@ -0,0 +1,121 @@ +/* -*- C++ -*- */ +// @(#)Peer_Router.h 1.1 10/18/96 + + +#if !defined (_PEER_ROUTER_H) +#define _PEER_ROUTER_H + +#include "ace/Acceptor.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Thread_Manager.h" +#include "ace/Map_Manager.h" + +#if defined (ACE_HAS_THREADS) + +// Forward declaration. +template <class PEER_HANDLER, class KEY> +class Peer_Router; + +template <class PEER_HANDLER, class KEY> +class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_SOCK_ACCEPTOR> + // = TITLE + // Creates <PEER_HANDLERs>, which route events between peers. +{ +public: + Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr); + Peer_Router<PEER_HANDLER, KEY> *router (void); + + int init (int argc, char *argv[]); + // Initialize the acceptor when it's linked dynamically. + +private: + Peer_Router<PEER_HANDLER, KEY> *pr_; +}; + +template <class ROUTER, class KEY> +class Peer_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> + // = TITLE + // Receive input from a Peer. +{ +public: + Peer_Handler (ACE_Thread_Manager * = 0); + + virtual int open (void * = 0); + // Called by the ACE_Acceptor::handle_input() to activate this object. + + virtual int handle_input (ACE_HANDLE); + // Receive input from the peer. + + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + // Send output to a peer. + +protected: + typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> inherited; + + ROUTER *router_task_; + // Pointer to write task. + +private: + virtual int svc (void); + // Don't need this method here... +}; + +template <class PEER_HANDLER, class PEER_KEY> +class Peer_Router : public ACE_Task<ACE_MT_SYNCH> + // = TITLE + // This abstract base class provides mechanisms for routing + // messages to/from a ACE_Stream from/to one or more peers (which + // are typically running on remote hosts). + // + // = DESCRIPTION + // A subclass of Peer_Router overrides the open(), close(), and + // put() methods in order to specialize the behavior of the router + // to meet application-specific requirements. +{ +public: + // = Initialization and termination methods. + Peer_Router (ACE_Thread_Manager * = 0); + ~Peer_Router (void); + + typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER; + + virtual int unbind_peer (PEER_KEY); + // Remove a PEER_HANDLER from the PEER_MAP. + + virtual int bind_peer (PEER_KEY, HANDLER *); + // Add a PEER_HANDLER to the PEER_MAP + + int send_peers (ACE_Message_Block *mb); + // Send the message block to the peer(s). + +protected: + virtual int control (ACE_Message_Block *); + // Handle control messages arriving from adjacent Modules. + + // = Useful typedefs + typedef ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_MAP; + typedef ACE_Map_Iterator<PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> PEER_ITERATOR; + + PEER_MAP peer_map_; + // Map used to keep track of active peers. + + // = Dynamic linking initialization hooks inherited from ACE_Task + virtual int init (int argc, char *argv[]); + virtual int fini (void); + + typedef Acceptor_Factory<PEER_HANDLER, PEER_KEY> ACCEPTOR; + + ACCEPTOR *acceptor_; + // Factory for accepting new PEER_HANDLERs. + +private: + // = Prevent copies and pass-by-value. + Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {} + void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &) {} +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Peer_Router.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ +#endif /* ACE_HAS_THREADS */ +#endif /* _PEER_ROUTER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp new file mode 100644 index 00000000000..6d18dce66fa --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.cpp @@ -0,0 +1,134 @@ +#include "Supplier_Router.h" +// @(#)Supplier_Router.cpp 1.1 10/18/96 + +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY; + +int +Supplier_Handler::open (void *a) +{ + SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a; + this->router_task_ = af->router (); + return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a); +} + +Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm) + : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm) +{ +} + +// Create a new router and associate it with the REACTOR parameter. + +Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm) + : SUPPLIER_ROUTER (tm) +{ +} + +// Handle incoming messages in a separate thread. + +int +Supplier_Router::svc (void) +{ + assert (this->is_writer ()); + + ACE_Thread_Control tc (this->thr_mgr ()); + ACE_Message_Block *mb = 0; + + ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in Supplier_Router\n")); + + while (this->getq (mb) >= 0) + { + ACE_DEBUG ((LM_DEBUG, "Supplier_Router is routing via send_peers\n")); + if (this->send_peers (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) send_peers failed in Supplier_Router\n"), + -1); + } + + ACE_DEBUG ((LM_DEBUG, "(%t) stopping svc in Supplier_Router\n")); + return 0; + // Note the implicit ACE_OS::thr_exit() via ACE_Thread_Control's + // destructor. +} + +// Initialize the Router. + +int +Supplier_Router::open (void *) +{ + assert (this->is_writer ()); + + char *argv[4]; + + argv[0] = (char *) this->name (); + argv[1] = "-p"; + argv[2] = options.supplier_port (); + argv[3] = 0; + + if (this->init (2, &argv[1]) == -1) + return -1; + + // Make this an active object. + return this->activate (options.t_flags ()); +} + +// Close down the router. + +int +Supplier_Router::close (u_long) +{ + assert (this->is_writer ()); + ACE_DEBUG ((LM_DEBUG, "(%t) closing Supplier_Router\n")); + this->peer_map_.close (); + + // Inform the thread to shut down. + this->msg_queue ()->deactivate (); + return 0; +} + +// Send a MESSAGE_BLOCK to the supplier(s). + +int +Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + assert (this->is_writer ()); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + else + // Queue up the message, which will be processed by + // Supplier_Router::svc(). + return this->putq (mb); +} + +// Return information about the Supplier_Router ACE_Module. + +int +Supplier_Router::info (char **strp, size_t length) const +{ + char buf[BUFSIZ]; + ACE_INET_Addr addr; + const char *mod_name = this->name (); + ACE_SOCK_Acceptor &sa = this->acceptor_->acceptor (); + + if (sa.get_local_addr (addr) == -1) + return -1; + + ACE_OS::sprintf (buf, "%s\t %d/%s %s", + mod_name, addr.get_port_number (), "tcp", + "# supplier router\n"); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Event_Server/Supplier_Router.h b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h new file mode 100644 index 00000000000..766e08c01e3 --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/Supplier_Router.h @@ -0,0 +1,51 @@ +/* -*- C++ -*- */ +// @(#)Supplier_Router.h 1.1 10/18/96 + +/* The interface between a supplier and an Event Service ACE_Stream */ + +#if !defined (_SUPPLIER_ROUTER_H) +#define _SUPPLIER_ROUTER_H + +#include "ace/INET_Addr.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/Map_Manager.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +/* Forward declaration */ +class Supplier_Handler; + +/* Type of search key for SUPPLIER_MAP */ +typedef ACE_HANDLE SUPPLIER_KEY; + +/* Instantiated type for routing messages to suppliers */ + +typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER; + +class Supplier_Handler : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> +{ +public: + Supplier_Handler (ACE_Thread_Manager *tm = 0); + virtual int open (void *); +}; + +class Supplier_Router : public SUPPLIER_ROUTER +{ +public: + Supplier_Router (ACE_Thread_Manager *); + +protected: + /* ACE_Task hooks. */ + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void); + + /* Dynamic linking hooks inherited from Peer_Router */ + virtual int info (char **info_string, size_t length) const; +}; + +#endif /* ACE_HAS_THREADS */ +#endif /* _SUPPLIER_ROUTER_H */ diff --git a/examples/ASX/Event_Server/Event_Server/event_server.cpp b/examples/ASX/Event_Server/Event_Server/event_server.cpp new file mode 100644 index 00000000000..e54bb845d84 --- /dev/null +++ b/examples/ASX/Event_Server/Event_Server/event_server.cpp @@ -0,0 +1,125 @@ +// Test the event server. +// @(#)event_server.cpp 1.1 10/18/96 + +#include "ace/Log_Msg.h" +#include "ace/Stream.h" +#include "ace/Service_Config.h" +#include "Options.h" +#include "Consumer_Router.h" +#include "Event_Analyzer.h" +#include "Supplier_Router.h" + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; +typedef ACE_Module<ACE_MT_SYNCH> MT_Module; + +// Handle SIGINT and terminate the entire application. + +class Quit_Handler : public ACE_Sig_Adapter +{ +public: + Quit_Handler (void); + virtual int handle_input (ACE_HANDLE fd); +}; + +Quit_Handler::Quit_Handler (void) + : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Service_Config::end_reactor_event_loop)) +{ + // Register to trap input from the user. + if (ACE::register_stdin_handler (this, + ACE_Service_Config::reactor (), + ACE_Service_Config::thr_mgr ()) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler")); + // Register to trap the SIGINT signal. + else if (ACE_Service_Config::reactor ()->register_handler + (SIGINT, this) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_handler")); +} + +int +Quit_Handler::handle_input (ACE_HANDLE) +{ + options.stop_timer (); + ACE_DEBUG ((LM_INFO, "(%t) closing down the test\n")); + options.print_results (); + + ACE_Service_Config::end_reactor_event_loop (); + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon; + + options.parse_args (argc, argv); + + { + // Primary ACE_Stream for EVENT_SERVER application. + MT_Stream event_server; + + // Enable graceful shutdowns... + Quit_Handler quit_handler; + + // Create the Supplier Router module. + + MT_Module *sr = new MT_Module ("Supplier_Router", + new Supplier_Router (ACE_Service_Config::thr_mgr ())); + + // Create the Event Analyzer module. + + MT_Module *ea = new MT_Module ("Event_Analyzer", + new Event_Analyzer, + new Event_Analyzer); + + // Create the Consumer Router module. + + MT_Module *cr = new MT_Module ("Consumer_Router", + 0, // 0 triggers the creation of a ACE_Thru_Task... + new Consumer_Router (ACE_Service_Config::thr_mgr ())); + + // Push the Modules onto the event_server stream. + + if (event_server.push (sr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1); + + if (event_server.push (ea) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1); + + if (event_server.push (cr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1); + + // Set the high and low water marks appropriately. + + int wm = options.low_water_mark (); + + if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1); + + wm = options.high_water_mark (); + if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1); + + options.start_timer (); + + // Perform the main event loop waiting for the user to type ^C or to + // enter a line on the ACE_STDIN. + + daemon.run_reactor_event_loop (); + // The destructor of event_server will close down the stream and + // call the close() hooks on all the ACE_Tasks. + } + + // Wait for the threads to exit. + ACE_Service_Config::thr_mgr ()->wait (); + ACE_DEBUG ((LM_DEBUG, "exiting main\n")); + return 0; +} +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1); +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Event_Server/Makefile b/examples/ASX/Event_Server/Makefile new file mode 100644 index 00000000000..6c8d3f443f5 --- /dev/null +++ b/examples/ASX/Event_Server/Makefile @@ -0,0 +1,23 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for the Event Server tests +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +DIRS = Event_Server \ + Transceiver + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nolocal.GNU + diff --git a/examples/ASX/Event_Server/README b/examples/ASX/Event_Server/README new file mode 100644 index 00000000000..f97e767cdd8 --- /dev/null +++ b/examples/ASX/Event_Server/README @@ -0,0 +1,38 @@ +The subdirectory illustrates a number of the ACE ASX framework +features using an ACE_Stream application called the Event Server. The +Event Server works as follows: + +1. When the ./Event_Server/event_server executable is run it + creates two SOCK_Acceptors, which listen for and accept + incoming connections from consumers and suppliers. + +2. The ./Event_Server/Transceiver/transceiver application may be + started multiple times. Each call should be either: + + % transceiver -p XYZ -h hostname + + or + + % transceiver -p ABC -h hostname + + where XYZ and ABC are the consumer port and supplier port, + respectively, on the event server and "hostname" is the name of the + machine the event_server is running. I typically open up multiple + windows. + +3. Once the consumer(s) and supplier(s) are connected, you can type + data from any supplier windows. This data will be routed + through the Modules/Tasks in an event_server's Stream and + be forwarded to the consumer(s). + +4. When you want to shut down the tranceivers or event server + just type ^C (which generates a SIGINT). + +What makes this example particularly interesting is that +once you've got the hang of this basic architecture, you can +"push" new filtering Modules onto the event_server Stream + and modify the application's behavior. + +For more information on the design and use of the ACE ASX framework +please see http://www.cs.wustl.edu/~schmidt/C++-USENIX-94.ps.gz and +http://www.cs.wustl.edu/~schmidt/DSEJ-94.ps.gz diff --git a/examples/ASX/Event_Server/Transceiver/Makefile b/examples/ASX/Event_Server/Transceiver/Makefile new file mode 100644 index 00000000000..7fb95dc617d --- /dev/null +++ b/examples/ASX/Event_Server/Transceiver/Makefile @@ -0,0 +1,135 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for the transceiver portion of the Event Server test +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = transceiver + +LSRC = $(addsuffix .cpp,$(BIN)) + +LDLIBS = + +VLDLIBS = $(LDLIBS:%=%$(VAR)) + +BUILD = $(VBIN) + +INSTALL = + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/transceiver.o .shobj/transceiver.so: transceiver.cpp \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Connector.i \ + $(WRAPPER_ROOT)/ace/Connector.cpp \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ + $(WRAPPER_ROOT)/ace/Get_Opt.h + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/ASX/Event_Server/Transceiver/transceiver.cpp b/examples/ASX/Event_Server/Transceiver/transceiver.cpp new file mode 100644 index 00000000000..7321e02dd9c --- /dev/null +++ b/examples/ASX/Event_Server/Transceiver/transceiver.cpp @@ -0,0 +1,187 @@ +// Test program for the event transceiver. This program can play the +// @(#)transceiver.cpp 1.1 10/18/96 + +// role of either Consumer or Supplier. You can terminate this +// program by typing ^C.... + +#include "ace/Log_Msg.h" +#include "ace/Service_Config.h" +#include "ace/Connector.h" +#include "ace/SOCK_Connector.h" +#include "ace/Get_Opt.h" + +#if defined (ACE_HAS_THREADS) + +class Event_Transceiver : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> + // = TITLE + // Generate and receives messages from the event server. + // + // = DESCRIPTION + // This class is both a consumer and supplier of events, i.e., + // it is a ``transceiver.'' +{ +public: + Event_Transceiver (void); + + // = Svc_Handler hook called by the <ACE_Connector>. + virtual int open (void *); + // Initialize the transceiver when we are connected. + + // = Demultplexing hooks from the <ACE_Reactor>. + virtual int handle_input (ACE_HANDLE); + // Receive data from STDIN or socket. + + virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); + // Close down via SIGINT. + +private: + int receiver (void); + // Reads data from socket and writes to ACE_STDOUT. + + int forwarder (void); + // Writes data from ACE_STDIN to socket. +}; + +// Close down via SIGINT. + +int +Event_Transceiver::handle_signal (int signum, + siginfo_t *, + ucontext_t *) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) received signal %S\n", signum)); + + ACE_Service_Config::end_reactor_event_loop (); + return 0; +} + +Event_Transceiver::Event_Transceiver (void) +{ + ACE_Sig_Set sig_set; + + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + + if (ACE_Service_Config::reactor ()->register_handler + (sig_set, this) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_handler")); +} + +int +Event_Transceiver::open (void *) +{ + if (ACE_Service_Config::reactor ()->register_handler + (this->peer ().get_handle (), + this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_handler"), -1); + else if (ACE::register_stdin_handler (this, + ACE_Service_Config::reactor (), + ACE_Service_Config::thr_mgr ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "register_stdin_handler"), -1); + return 0; +} + +int +Event_Transceiver::handle_input (ACE_HANDLE handle) +{ + if (handle == ACE_STDIN) + return this->forwarder (); + else + return this->receiver (); +} + + +int +Event_Transceiver::forwarder (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver forwarder\n")); + + char buf[BUFSIZ]; + ssize_t n = ACE_OS::read (ACE_STDIN, buf, sizeof buf); + int result = 0; + + if (n <= 0 || this->peer ().send_n (buf, n) != n) + result = -1; + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver forwarder\n")); + return result; +} + +int +Event_Transceiver::receiver (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) entering transceiver receiver\n")); + + char buf[BUFSIZ]; + + ssize_t n = this->peer ().recv (buf, sizeof buf); + int result = 0; + + if (n <= 0 || ACE_OS::write (ACE_STDOUT, buf, n) != n) + result = -1; + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) leaving transceiver receiver\n")); + return result; +} + +// Port number of event server. +static u_short port_number; + +// Name of event server. +static char *host_name; + +// Handle the command-line arguments. + +static void +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "h:p:"); + + port_number = ACE_DEFAULT_SERVER_PORT; + host_name = ACE_DEFAULT_SERVER_HOST; + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'h': + host_name = get_opt.optarg; + break; + case 'p': + port_number = ACE_OS::atoi (get_opt.optarg); + break; + default: + ACE_ERROR ((LM_ERROR, + "usage: %n [-p portnum] [-h host_name]\n%a", 1)); + /* NOTREACHED */ + break; + } +} + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon (argv[0]); + + parse_args (argc, argv); + + // Establish the connection. + ACE_Connector<Event_Transceiver, ACE_SOCK_CONNECTOR> connector; + Event_Transceiver transceiver; + + if (connector.connect (&transceiver, ACE_INET_Addr (port_number, host_name)) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", host_name), 1); + + // Run event loop until either the event server shuts down or we get + // a SIGINT. + ACE_Service_Config::run_reactor_event_loop (); + return 0; +} +#else +int +main (void) +{ + ACE_ERROR ((LM_ERROR, "test not defined for this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Makefile b/examples/ASX/Makefile new file mode 100644 index 00000000000..38b94b51626 --- /dev/null +++ b/examples/ASX/Makefile @@ -0,0 +1,25 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for the ASX test directory +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +DIRS = CCM_App \ + Event_Server \ + Message_Queue \ + UPIPE_Event_Server + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nolocal.GNU + diff --git a/examples/ASX/Message_Queue/Makefile b/examples/ASX/Message_Queue/Makefile new file mode 100644 index 00000000000..77e32813526 --- /dev/null +++ b/examples/ASX/Message_Queue/Makefile @@ -0,0 +1,218 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for Message_Queue tests +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = buffer_stream \ + bounded_buffer \ + priority_buffer + +LSRC = $(addsuffix .cpp,$(BIN)) + +VLDLIBS = $(LDLIBS:%=%$(VAR)) + +BUILD = $(VBIN) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/buffer_stream.o .shobj/buffer_stream.so: buffer_stream.cpp \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i +.obj/bounded_buffer.o .shobj/bounded_buffer.so: bounded_buffer.cpp \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h +.obj/priority_buffer.o .shobj/priority_buffer.so: priority_buffer.cpp \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Read_Buffer.h \ + $(WRAPPER_ROOT)/ace/Read_Buffer.i \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/ASX/Message_Queue/bounded_buffer.cpp b/examples/ASX/Message_Queue/bounded_buffer.cpp new file mode 100644 index 00000000000..ec5abd1d7e1 --- /dev/null +++ b/examples/ASX/Message_Queue/bounded_buffer.cpp @@ -0,0 +1,130 @@ +// This short program copies stdin to stdout via the use of an ASX +// @(#)bounded_buffer.cpp 1.1 10/18/96 + +// Message_Queue. It illustrates an implementation of the classic +// "bounded buffer" program. + +#include "ace/Log_Msg.h" +#include "ace/Message_Queue.h" +#include "ace/Thread_Manager.h" + +#if defined (ACE_HAS_THREADS) + +// Global thread manager. +static ACE_Thread_Manager thr_mgr; + +// Message list. +static ACE_Message_Queue<ACE_MT_SYNCH> msg_queue; + +// The producer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +static void * +producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Insert thread into thr_mgr. + ACE_Thread_Control thread_control (&thr_mgr); + + // Keep reading stdin, until we reach EOF. + + for (int n; ; ) + { + // Allocate a new message. + ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + + n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); + + if (n <= 0) + { + // Send a shutdown message to the other thread and exit. + mb->length (0); + if (msg_queue->enqueue_tail (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + break; + } + + // Send the message to the other thread. + else + { + mb->msg_priority (n); + mb->wr_ptr (n); + if (msg_queue->enqueue_tail (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + } + } + + // The destructor of ACE_Thread_Control removes the exiting thread + // from the thr_mgr automatically. + return 0; +} + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// producer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Insert thread into thr_mgr. + ACE_Thread_Control thread_control (&thr_mgr); + int result = 0; + + // Keep looping, reading a message out of the queue, until we timeout + // or get a message with a length == 0, which signals us to quit. + + for (;;) + { + ACE_Message_Block *mb; + + ACE_Time_Value timeout (ACE_OS::time (0) + 4, 0); // Wait for upto 4 seconds + + result = msg_queue->dequeue_head (mb, &timeout); + + if (result == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); + + delete mb; + + if (length == 0) + break; + } + + if (result == -1 && errno == EWOULDBLOCK) + ACE_ERROR ((LM_ERROR, "(%t) %p\n%a", "timed out waiting for message", 1)); + + // The destructor of ACE_Thread_Control removes the exiting thread + // from the thr_mgr automatically. + return 0; +} + +/* Spawn off two threads that copy stdin to stdout. */ + +int main (int, char *[]) +{ + if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + else if (thr_mgr.spawn (ACE_THR_FUNC (consumer), (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + + // Wait for producer and consumer threads to exit. + thr_mgr.wait (); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp new file mode 100644 index 00000000000..0cc96cd16c5 --- /dev/null +++ b/examples/ASX/Message_Queue/buffer_stream.cpp @@ -0,0 +1,215 @@ +// This short program copies stdin to stdout via the use of an ASX +// @(#)buffer_stream.cpp 1.1 10/18/96 + +// STREAM. It illustrates an implementation of the classic "bounded +// buffer" program using an ASX STREAM containing two Modules. Each +// ACE_Module contains two Tasks. Each ACE_Task contains a +// ACE_Message_Queue and a pointer to a ACE_Thread_Manager. Note how +// the use of these reusable components reduces the reliance on global +// variables, as compared with the bounded_buffer.C example. + +#include "ace/Log_Msg.h" +#include "ace/Synch.h" +#include "ace/Service_Config.h" +#include "ace/Stream.h" +#include "ace/Module.h" +#include "ace/Task.h" + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; +typedef ACE_Module<ACE_MT_SYNCH> MT_Module; +typedef ACE_Task<ACE_MT_SYNCH> MT_Task; + +class Common_Task : public MT_Task + // = TITLE + // Methods that are common to the producer and consumer. +{ +public: + Common_Task (void) {} + // ACE_Task hooks + virtual int open (void * = 0); + virtual int close (u_long = 0); + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) { return 0; } + + // ACE_Service_Object hooks + virtual int init (int, char **) { return 0; } + virtual int fini (void) { return 0; } + virtual int info (char **, size_t) const { return 0; } +}; + +// Define the Producer interface. + +class Producer : public Common_Task +{ +public: + Producer (void) {} + + // Read data from stdin and pass to consumer. + virtual int svc (void); +}; + +class Consumer : public Common_Task + // = TITLE + // Define the Consumer interface. +{ +public: + Consumer (void) {} + + // Enqueue the message on the ACE_Message_Queue for subsequent + // handling in the svc() method. + virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); + + // Receive message from producer and print to stdout. + virtual int svc (void); + +private: + + ACE_Time_Value timeout_; +}; + +// Spawn off a new thread. + +int +Common_Task::open (void *) +{ + if (this->activate (THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), -1); + return 0; +} + +int +Common_Task::close (u_long exit_status) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) thread is exiting with status %d in module %s\n", + exit_status, this->name ())); + + // Can do anything here that is required when a thread exits, e.g., + // storing thread-specific information in some other storage + // location, etc. + return 0; +} + +// The Consumer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +int +Producer::svc (void) +{ + // Keep reading stdin, until we reach EOF. + + for (int n; ; ) + { + // Allocate a new message. + ACE_Message_Block *mb = new ACE_Message_Block (BUFSIZ); + + n = ACE_OS::read (ACE_STDIN, mb->rd_ptr (), mb->size ()); + + if (n <= 0) + { + // Send a shutdown message to the other thread and exit. + mb->length (0); + + if (this->put_next (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + break; + } + + // Send the message to the other thread. + else + { + mb->wr_ptr (n); + + if (this->put_next (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + } + } + + return 0; +} + +// Simply enqueue the Message_Block into the end of the queue. + +int +Consumer::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +{ + return this->putq (mb, tv); +} + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// Consumer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +int +Consumer::svc (void) +{ + int result = 0; + + // Keep looping, reading a message out of the queue, until we + // timeout or get a message with a length == 0, which signals us to + // quit. + + for (;;) + { + ACE_Message_Block *mb; + + this->timeout_.sec (ACE_OS::time (0) + 4); // Wait for upto 4 seconds + + result = this->getq (mb, &this->timeout_); + + if (result == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); + + delete mb; + + if (length == 0) + break; + } + + if (result == -1 && errno == EWOULDBLOCK) + ACE_ERROR ((LM_ERROR, "(%t) %p\n%a", "timed out waiting for message", 1)); + return 0; +} + +// Main driver function. + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon (argv[0]); + + // Control hierachically-related active objects + MT_Stream stream; + MT_Module *pm = new MT_Module ("Consumer", new Consumer); + MT_Module *cm = new MT_Module ("Producer", new Producer); + + // Create Producer and Consumer Modules and push them onto the + // STREAM. All processing is performed in the STREAM. + + if (stream.push (pm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1); + else if (stream.push (cm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push"), 1); + + // Barrier synchronization: wait for the threads to exit, then exit + // ourselves. + ACE_Service_Config::thr_mgr ()->wait (); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/Message_Queue/priority_buffer.cpp b/examples/ASX/Message_Queue/priority_buffer.cpp new file mode 100644 index 00000000000..ef59d76b355 --- /dev/null +++ b/examples/ASX/Message_Queue/priority_buffer.cpp @@ -0,0 +1,139 @@ +// This short program prints the contents of stdin to stdout sorted by +// @(#)priority_buffer.cpp 1.1 10/18/96 + +// the length of each line via the use of an ASX Message_Queue. It +// illustrates how priorities can be used for ACE Message_Queues. + +#include "ace/Log_Msg.h" +#include "ace/Message_Queue.h" +#include "ace/Read_Buffer.h" +#include "ace/Thread_Manager.h" +#include "ace/Service_Config.h" + +#if defined (ACE_HAS_THREADS) + +// Global thread manager. +static ACE_Thread_Manager thr_mgr; + +// Make the queue be capable of being *very* large. +static const long max_queue = LONG_MAX; + +// Message queue. +static ACE_Message_Queue<ACE_MT_SYNCH> msg_queue (max_queue); + +// The consumer dequeues a message from the ACE_Message_Queue, writes +// the message to the stderr stream, and deletes the message. The +// producer sends a 0-sized message to inform the consumer to stop +// reading and exit. + +static void * +consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Keep looping, reading a message out of the queue, until we + // timeout or get a message with a length == 0, which signals us to + // quit. + + for (;;) + { + ACE_Message_Block *mb; + + if (msg_queue->dequeue_head (mb) == -1) + break; + + int length = mb->length (); + + if (length > 0) + ACE_OS::puts (mb->rd_ptr ()); + + // Free up the buffer memory and the Message_Block. + ACE_Service_Config::allocator ()->free (mb->rd_ptr ()); + delete mb; + + if (length == 0) + break; + } + + return 0; +} + +// The producer reads data from the stdin stream, creates a message, +// and then queues the message in the message list, where it is +// removed by the consumer thread. A 0-sized message is enqueued when +// there is no more data to read. The consumer uses this as a flag to +// know when to exit. + +static void * +producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) +{ + // Insert thread into thr_mgr. + ACE_Thread_Control thread_control (&thr_mgr); + + ACE_Read_Buffer rb (ACE_STDIN); + + // Keep reading stdin, until we reach EOF. + + for (int n; ; ) + { + // Allocate a new buffer. + char *buffer = rb.read ('\n'); + + if (buffer == 0) + { + // Send a 0-sized shutdown message to the other thread and + // exit. + if (msg_queue->enqueue_tail (new ACE_Message_Block ((size_t) 0)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + break; + } + + // Enqueue the message in priority order. + else + { + // Allocate a new message, but have it "borrow" its memory + // from the buffer. + ACE_Message_Block *mb = new ACE_Message_Block (rb.size (), + ACE_Message_Block::MB_DATA, + 0, + buffer); + mb->msg_priority (rb.size ()); + mb->wr_ptr (rb.size ()); + + ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n", + mb->msg_priority ())); + // Enqueue in priority order. + if (msg_queue->enqueue (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); + } + } + + // Now read all the items out in priority order (i.e., ordered by + // the size of the lines!). + consumer (msg_queue); + + // The destructor of ACE_Thread_Control removes the exiting thread + // from the thr_mgr automatically. + return 0; +} + +// Spawn off one thread that copies stdin to stdout in order of the +// size of each line. + +int +main (int, char *[]) +{ + if (thr_mgr.spawn (ACE_THR_FUNC (producer), (void *) &msg_queue, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + + // Wait for producer and consumer threads to exit. + thr_mgr.wait (); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp new file mode 100644 index 00000000000..e679de5390e --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Consumer_Router.cpp @@ -0,0 +1,126 @@ +#include "Consumer_Router.h" +// @(#)Consumer_Router.cpp 1.1 10/18/96 + +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +typedef Acceptor_Factory<Consumer_Handler, CONSUMER_KEY> CONSUMER_FACTORY; + +int +Consumer_Handler::open (void *a) +{ + CONSUMER_FACTORY *af = (CONSUMER_FACTORY *) a; + this->router_task_ = af->router (); + return this->Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY>::open (a); +} + +Consumer_Handler::Consumer_Handler (ACE_Thread_Manager *tm) + : Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> (tm) +{ +} + +// Create a new handler that will interact with a consumer and point +// its ROUTER_TASK_ data member to the CONSUMER_ROUTER. + +Consumer_Router::Consumer_Router (ACE_Thread_Manager *tm) + : CONSUMER_ROUTER (tm) +{ +} + +// Initialize the Router.. + +int +Consumer_Router::open (void *) +{ + ACE_ASSERT (this->is_reader ()); + char *argv[3]; + + argv[0] = (char *) this->name (); + argv[1] = options.consumer_file (); + argv[2] = 0; + + if (this->init (1, &argv[1]) == -1) + return -1; + + // Make this an active object. +// return this->activate (options.t_flags ()); +} + +int +Consumer_Router::close (u_long) +{ + ACE_ASSERT (this->is_reader ()); + this->peer_map_.close (); + this->msg_queue ()->deactivate(); + return 0; +} + + +// Handle incoming messages in a separate thread.. + +int +Consumer_Router::svc (void) +{ + ACE_Thread_Control tc (this->thr_mgr ()); + ACE_Message_Block *mb = 0; + + ACE_ASSERT (this->is_reader ()); + + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in %s\n", this->name ())); + + while (this->getq (mb) > 0) + { + if (this->put_next (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) put_next failed in %s\n", this->name ()), -1); + + } + return 0; + // Note the implicit ACE_OS::thr_exit() via destructor. +} + +// Send a MESSAGE_BLOCK to the supplier(s).. + +int +Consumer_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + ACE_ASSERT (this->is_reader ()); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + else +{ +//printf("consumer-Router is routing : send_peers\n"); + return this->send_peers (mb); +} +} + +// Return information about the Client_Router ACE_Module.. + +int +Consumer_Router::info (char **strp, size_t length) const +{ + char buf[BUFSIZ]; + ACE_UPIPE_Addr addr; + const char *mod_name = this->name (); + ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_; + + if (sa.get_local_addr (addr) == -1) + return -1; + + ACE_OS::sprintf (buf, "%s\t /%s %s", + mod_name, "upipe", + "# consumer router\n"); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/UPIPE_Event_Server/Consumer_Router.h b/examples/ASX/UPIPE_Event_Server/Consumer_Router.h new file mode 100644 index 00000000000..66d3f28ba07 --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Consumer_Router.h @@ -0,0 +1,48 @@ +/* -*- C++ -*- */ +// @(#)Consumer_Router.h 1.1 10/18/96 + +// The interface between one or more consumers and an Event Server +// ACE_Stream. + +#if !defined (_CONSUMER_ROUTER_H) +#define _CONSUMER_ROUTER_H + +#include "ace/Thread_Manager.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/UPIPE_Addr.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +class Consumer_Handler; // Forward declaration.... + +typedef long CONSUMER_KEY; + +typedef Peer_Router<Consumer_Handler, CONSUMER_KEY> CONSUMER_ROUTER; + +class Consumer_Handler + : public Peer_Handler<CONSUMER_ROUTER, CONSUMER_KEY> +{ +public: + Consumer_Handler (ACE_Thread_Manager *tm = 0); + virtual int open (void *); +}; + +class Consumer_Router : public CONSUMER_ROUTER +{ +public: + Consumer_Router (ACE_Thread_Manager *thr_manager); + +protected: + // ACE_Task hooks.. + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void); + + // Dynamic linking hooks. + virtual int info (char **info_string, size_t length) const; +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _CONSUMER_ROUTER_H */ diff --git a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp new file mode 100644 index 00000000000..977e5c4af9d --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.cpp @@ -0,0 +1,68 @@ +#include "Event_Analyzer.h" +// @(#)Event_Analyzer.cpp 1.1 10/18/96 + + +#if defined (ACE_HAS_THREADS) + +int +Event_Analyzer::open (void *) +{ + return 0; +} + +int +Event_Analyzer::close (u_long) +{ + return 0; +} + +int +Event_Analyzer::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd; + + switch (cmd = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (cmd, *(size_t *) mb->cont ()->rd_ptr ()); + break; + } + return 0; +} + +int +Event_Analyzer::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + this->control (mb); + + return this->put_next (mb); +} + +int +Event_Analyzer::init (int, char *[]) +{ + return 0; +} + +int +Event_Analyzer::fini (void) +{ + return 0; +} + +int +Event_Analyzer::info (char **strp, size_t length) const +{ + const char *mod_name = this->name (); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h new file mode 100644 index 00000000000..30053d5f45d --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h @@ -0,0 +1,34 @@ +/* -*- C++ -*- */ +// @(#)Event_Analyzer.h 1.1 10/18/96 + +// Signal router. + +#if !defined (_EVENT_ANALYZER_H) +#define _EVENT_ANALYZER_H + +#include "ace/Stream.h" +#include "ace/Module.h" +#include "ace/Task.h" +#include "ace/Synch.h" + +#if defined (ACE_HAS_THREADS) + +class Event_Analyzer : public ACE_Task<ACE_MT_SYNCH> +{ +public: + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void) { return 0; } + + // Dynamic linking hooks. + virtual int init (int argc, char *argv[]); + virtual int fini (void); + virtual int info (char **info_string, size_t length) const; + +private: + virtual int control (ACE_Message_Block *); +}; + +#endif /* ACE_HAS_THREADS */ +#endif /* _EVENT_ANALYZER_H */ diff --git a/examples/ASX/UPIPE_Event_Server/Makefile b/examples/ASX/UPIPE_Event_Server/Makefile new file mode 100644 index 00000000000..bb0cdc00ed4 --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Makefile @@ -0,0 +1,455 @@ +#---------------------------------------------------------------------------- +# @(#)Makefile 1.1 10/18/96 +# +# Makefile for the Event Server test +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = event_server + +FILES = Options \ + Supplier_Router \ + Event_Analyzer \ + Consumer_Router \ + Peer_Router + +LSRC = $(addsuffix .cpp,$(FILES)) +LOBJ = $(addsuffix .o,$(FILES)) +SHOBJ = $(addsuffix .so,$(FILES)) + +LDLIBS = $(addprefix .shobj/,$(SHOBJ)) + +VLDLIBS = $(LDLIBS:%=%$(VAR)) + +BUILD = $(VBIN) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(WRAPPER_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/macros.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.common.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.nonested.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.lib.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.bin.GNU +include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU + +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + +.obj/Options.o .shobj/Options.so: Options.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i +.obj/Supplier_Router.o .shobj/Supplier_Router.so: Supplier_Router.cpp Supplier_Router.h \ + $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i \ + $(WRAPPER_ROOT)/ace/SPIPE.h \ + $(WRAPPER_ROOT)/ace/SPIPE.i \ + $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + Peer_Router.h \ + $(WRAPPER_ROOT)/ace/Acceptor.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Acceptor.i \ + $(WRAPPER_ROOT)/ace/Acceptor.cpp \ + Peer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i +.obj/Event_Analyzer.o .shobj/Event_Analyzer.so: Event_Analyzer.cpp Event_Analyzer.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i +.obj/Consumer_Router.o .shobj/Consumer_Router.so: Consumer_Router.cpp Consumer_Router.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i \ + $(WRAPPER_ROOT)/ace/SPIPE.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + Peer_Router.h \ + $(WRAPPER_ROOT)/ace/Acceptor.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Acceptor.i \ + $(WRAPPER_ROOT)/ace/Acceptor.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + Peer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i +.obj/Peer_Router.o .shobj/Peer_Router.so: Peer_Router.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Synch_T.cpp \ + $(WRAPPER_ROOT)/ace/Synch_T.i \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Set.cpp \ + $(WRAPPER_ROOT)/ace/Set.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.cpp \ + $(WRAPPER_ROOT)/ace/Malloc_T.i \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.i \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + Peer_Router.h \ + $(WRAPPER_ROOT)/ace/Acceptor.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.cpp \ + $(WRAPPER_ROOT)/ace/Message_Queue.i \ + $(WRAPPER_ROOT)/ace/Task.cpp \ + $(WRAPPER_ROOT)/ace/Module.h \ + $(WRAPPER_ROOT)/ace/Module.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.h \ + $(WRAPPER_ROOT)/ace/Stream_Modules.cpp \ + $(WRAPPER_ROOT)/ace/Stream_Modules.i \ + $(WRAPPER_ROOT)/ace/Module.i \ + $(WRAPPER_ROOT)/ace/Task.i \ + $(WRAPPER_ROOT)/ace/Svc_Handler.cpp \ + $(WRAPPER_ROOT)/ace/Dynamic.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.i \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies.cpp \ + $(WRAPPER_ROOT)/ace/Acceptor.i \ + $(WRAPPER_ROOT)/ace/Acceptor.cpp \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/UPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/Stream.h \ + $(WRAPPER_ROOT)/ace/Stream.cpp \ + $(WRAPPER_ROOT)/ace/Stream.i \ + $(WRAPPER_ROOT)/ace/SPIPE.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Addr.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Acceptor.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.h \ + $(WRAPPER_ROOT)/ace/SPIPE_Stream.i \ + $(WRAPPER_ROOT)/ace/UPIPE_Acceptor.i \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.cpp \ + $(WRAPPER_ROOT)/ace/Map_Manager.i \ + Peer_Router.cpp Options.h \ + $(WRAPPER_ROOT)/ace/Profile_Timer.h \ + Options.i + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/ASX/UPIPE_Event_Server/Options.cpp b/examples/ASX/UPIPE_Event_Server/Options.cpp new file mode 100644 index 00000000000..2a334d7ff2a --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Options.cpp @@ -0,0 +1,191 @@ +#include "ace/Get_Opt.h" +// @(#)Options.cpp 1.1 10/18/96 + +#include "ace/Synch.h" +#include "ace/Log_Msg.h" +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +Options::Options (void) + : debugging_ (0), + verbosity_ (0), + low_water_mark_ (1024), + high_water_mark_ (8 * 1024), + message_size_ (128), + thr_count_ (4), + initial_queue_length_ (0), + iterations_ (100000), + consumer_port_ ("-p 10000"), + supplier_port_ ("-p 10001"), + consumer_file_ ("-f/tmp/conupipe"), + supplier_file_ ("-f/tmp/supupipe"), + t_flags_ (THR_DETACHED) +{ +} + +Options::~Options (void) +{ +} + +void Options::print_results (void) +{ + ACE_Profile_Timer::ACE_Elapsed_Time et; + this->itimer_.elapsed_time (et); + +#if defined (ACE_HAS_PRUSAGE_T) + prusage_t rusage; + this->itimer_.get_rusage (rusage); + + if (options.verbose ()) + { + ACE_OS::printf ("final concurrency hint = %d\n", ACE_OS::thr_getconcurrency ()); + ACE_OS::printf ("%8d = lwpid\n" + "%8d = lwp count\n" + "%8d = minor page faults\n" + "%8d = major page faults\n" + "%8d = input blocks\n" + "%8d = output blocks\n" + "%8d = messages sent\n" + "%8d = messages received\n" + "%8d = signals received\n" + "%8ds, %dms = wait-cpu (latency) time\n" + "%8ds, %dms = user lock wait sleep time\n" + "%8ds, %dms = all other sleep time\n" + "%8d = voluntary context switches\n" + "%8d = involuntary context switches\n" + "%8d = system calls\n" + "%8d = chars read/written\n", + rusage.pr_lwpid, + rusage.pr_count, + rusage.pr_minf, + rusage.pr_majf, + rusage.pr_inblk, + rusage.pr_oublk, + rusage.pr_msnd, + rusage.pr_mrcv, + rusage.pr_sigs, + rusage.pr_wtime.tv_sec, rusage.pr_wtime.tv_nsec / 1000000, + rusage.pr_ltime.tv_sec, rusage.pr_ltime.tv_nsec / 1000000, + rusage.pr_slptime.tv_sec, rusage.pr_slptime.tv_nsec / 1000000, + rusage.pr_vctx, + rusage.pr_ictx, + rusage.pr_sysc, + rusage.pr_ioch); + } +#endif /* ACE_HAS_PRUSAGE_T */ + + ACE_OS::printf ("---------------------\n" + "real time = %.3f\n" + "user time = %.3f\n" + "system time = %.3f\n" + "---------------------\n", + et.real_time, et.user_time, et.system_time); +} + +// Manages the options. +Options options; + +void +Options::parse_args (int argc, char *argv[]) +{ + ACE_LOG_MSG->open (argv[0]); + + ACE_Get_Opt getopt (argc, argv, "C:c:bdH:i:L:l:M:nS:s:t:T:v"); + int c; + + while ((c = getopt ()) != -1) + switch (c) + { + case 'b': + this->t_flags (THR_BOUND); + break; + case 'C': + this->consumer_file (getopt.optarg); + break; + case 'c': + this->consumer_port (getopt.optarg); + break; + case 'd': + this->debugging_ = 1; + break; + case 'H': + this->high_water_mark (ACE_OS::atoi (getopt.optarg)); + break; + case 'i': + this->iterations (ACE_OS::atoi (getopt.optarg)); + break; + case 'L': + this->low_water_mark (ACE_OS::atoi (getopt.optarg)); + break; + case 'l': + this->initial_queue_length (ACE_OS::atoi (getopt.optarg)); + break; + case 'M': + this->message_size (ACE_OS::atoi (getopt.optarg)); + break; + case 'n': + this->t_flags (THR_NEW_LWP); + break; + case 'S': + this->supplier_file (getopt.optarg); + break; + case 's': + this->supplier_port (getopt.optarg); + break; + case 'T': + if (ACE_OS::strcasecmp (getopt.optarg, "ON") == 0) + ACE_Trace::start_tracing (); + else if (ACE_OS::strcasecmp (getopt.optarg, "OFF") == 0) + ACE_Trace::stop_tracing (); + break; + case 't': + this->thr_count (ACE_OS::atoi (getopt.optarg)); + break; + case 'v': + this->verbosity_ = 1; + break; + default: + ::fprintf (stderr, "%s\n" + "\t[-b] (THR_BOUND)\n" + "\t[-C consumer file]\n" + "\t[-c consumer port]\n" + "\t[-d] (enable debugging)\n" + "\t[-H high water mark]\n" + "\t[-i number of test iterations]\n" + "\t[-L low water mark]\n" + "\t[-M] message size \n" + "\t[-n] (THR_NEW_LWP)\n" + "\t[-q max queue size]\n" + "\t[-S supplier file]\n" + "\t[-s supplier port]\n" + "\t[-t number of threads]\n" + "\t[-v] (verbose) \n", + argv[0]); + ::exit (1); + /* NOTREACHED */ + break; + } + + if (this->verbose ()) + ACE_OS::printf ("%8d = initial concurrency hint\n" + "%8d = total iterations\n" + "%8d = thread count\n" + "%8d = low water mark\n" + "%8d = high water mark\n" + "%8d = message_size\n" + "%8d = initial queue length\n" + "%8d = THR_BOUND\n" + "%8d = THR_NEW_LWP\n", + ACE_OS::thr_getconcurrency (), + this->iterations (), + this->thr_count (), + this->low_water_mark (), + this->high_water_mark (), + this->message_size (), + this->initial_queue_length (), + (this->t_flags () & THR_BOUND) != 0, + (this->t_flags () & THR_NEW_LWP) != 0); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/UPIPE_Event_Server/Options.h b/examples/ASX/UPIPE_Event_Server/Options.h new file mode 100644 index 00000000000..bd73eae2bae --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Options.h @@ -0,0 +1,83 @@ +/* -*- C++ -*- */ +// @(#)Options.h 1.1 10/18/96 + +// Option manager for Event Server. + +#if !defined (DEVICE_OPTIONS_H) +#define DEVICE_OPTIONS_H + +#include "ace/OS.h" +#include "ace/Profile_Timer.h" + +#if defined (ACE_HAS_THREADS) + +class Options +{ +public: + Options (void); + ~Options (void); + void parse_args (int argc, char *argv[]); + + void stop_timer (void); + void start_timer (void); + + void thr_count (size_t count); + size_t thr_count (void); + + void initial_queue_length (size_t length); + size_t initial_queue_length (void); + + void high_water_mark (size_t size); + size_t high_water_mark (void); + + void low_water_mark (size_t size); + size_t low_water_mark (void); + + void message_size (size_t size); + size_t message_size (void); + + void iterations (size_t n); + size_t iterations (void); + + void t_flags (long flag); + long t_flags (void); + + void supplier_port (char *port); + char *supplier_port (void); + + void consumer_port (char *port); + char *consumer_port (void); + + void supplier_file (char *file); + char *supplier_file (void); + + void consumer_file (char *file); + char *consumer_file (void); + + int debug (void); + int verbose (void); + + void print_results (void); + +private: + ACE_Profile_Timer itimer_; // Time the process. + size_t thr_count_; // Number of threads to spawn. + long t_flags_; // Flags to thr_create(). + size_t high_water_mark_; // ACE_Task high water mark. + size_t low_water_mark_; // ACE_Task low water mark. + size_t message_size_; // Size of a message. + size_t initial_queue_length_; // Initial number of items in the queue. + size_t iterations_; // Number of iterations to run the test program. + int debugging_; // Extra debugging info. + int verbosity_; // Extra verbose messages. + char *consumer_port_; // Port that the Consumer_Router is using. + char *supplier_port_; // Port that the Supplier_Router is using. + char *consumer_file_; // file that the Consumer_Router is using. + char *supplier_file_; // file that the Supplier_Router is using. +}; + +extern Options options; + +#include "Options.i" +#endif /* ACE_HAS_THREADS */ +#endif /* DEVICE_OPTIONS_H */ diff --git a/examples/ASX/UPIPE_Event_Server/Options.i b/examples/ASX/UPIPE_Event_Server/Options.i new file mode 100644 index 00000000000..226e46b1548 --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Options.i @@ -0,0 +1,161 @@ +/* -*- C++ -*- */ +// @(#)Options.i 1.1 10/18/96 + +// Option manager for ustreams. + +inline void +Options::supplier_port (char *port) +{ + this->supplier_port_ = port; +} + +inline char * +Options::supplier_port (void) +{ + return this->supplier_port_; +} + +inline void +Options::supplier_file (char *file) +{ + this->supplier_file_ = file; +} + +inline char * +Options::supplier_file (void) +{ + return this->supplier_file_; +} + +inline void +Options::consumer_file (char *file) +{ + this->consumer_file_ = file; +} + +inline char * +Options::consumer_file (void) +{ + return this->consumer_file_; +} + +inline void +Options::consumer_port (char *port) +{ + this->consumer_port_ = port; +} + +inline char * +Options::consumer_port (void) +{ + return this->consumer_port_; +} + +inline void +Options::start_timer (void) +{ + this->itimer_.start (); +} + +inline void +Options::stop_timer (void) +{ + this->itimer_.stop (); +} + +inline void +Options::thr_count (size_t count) +{ + this->thr_count_ = count; +} + +inline size_t +Options::thr_count (void) +{ + return this->thr_count_; +} + +inline void +Options::initial_queue_length (size_t length) +{ + this->initial_queue_length_ = length; +} + +inline size_t +Options::initial_queue_length (void) +{ + return this->initial_queue_length_; +} + +inline void +Options::high_water_mark (size_t size) +{ + this->high_water_mark_ = size; +} + +inline size_t +Options::high_water_mark (void) +{ + return this->high_water_mark_; +} + +inline void +Options::low_water_mark (size_t size) +{ + this->low_water_mark_ = size; +} + +inline size_t +Options::low_water_mark (void) +{ + return this->low_water_mark_; +} + +inline void +Options::message_size (size_t size) +{ + this->message_size_ = size; +} + +inline size_t +Options::message_size (void) +{ + return this->message_size_; +} + +inline void +Options::iterations (size_t n) +{ + this->iterations_ = n; +} + +inline size_t +Options::iterations (void) +{ + return this->iterations_; +} + +inline void +Options::t_flags (long flag) +{ + this->t_flags_ |= flag; +} + +inline long +Options::t_flags (void) +{ + return this->t_flags_; +} + +inline int +Options::debug (void) +{ + return this->debugging_; +} + +inline int +Options::verbose (void) +{ + return this->verbosity_; +} + diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp new file mode 100644 index 00000000000..6aba899f4ea --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp @@ -0,0 +1,273 @@ +#if !defined (_PEER_ROUTER_C) +// @(#)Peer_Router.cpp 1.1 10/18/96 + +#define _PEER_ROUTER_C + +#include "ace/Get_Opt.h" +#include "ace/Service_Config.h" +#include "ace/Log_Msg.h" +#include "Peer_Router.h" +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +// Define some short-hand macros to deal with long templates +// names... + +#define PH PEER_HANDLER +#define PA PEER_ACCEPTOR +#define PAD PEER_ADDR +#define PK PEER_KEY +#define PM PEER_MAP + +template <class PH, class PK> int +Acceptor_Factory<PH, PK>::init (int argc, char *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, "df:", 0); + ACE_UPIPE_Addr addr; + + for (int c; (c = get_opt ()) != -1; ) + switch (c) + { + case 'f': + addr.set (get_opt.optarg); + break; + case 'd': + break; + default: + break; + } + + if (this->open (addr, ACE_Service_Config::reactor ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), -1); + return 0; +} + +template <class PH, class PK> +Acceptor_Factory<PH, PK>::Acceptor_Factory (Peer_Router<PH, PK> *pr) + : pr_ (pr) +{ +} + +template <class PH, class PK> Peer_Router<PH, PK> * +Acceptor_Factory<PH, PK>::router (void) +{ + return this->pr_; +} + +template <class ROUTER, class KEY> +Peer_Handler<ROUTER, KEY>::Peer_Handler (ACE_Thread_Manager *tm) + : ACE_Svc_Handler<ACE_UPIPE_Stream, ACE_UPIPE_Addr, ACE_MT_SYNCH> (tm) +{ +} + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::svc (void) +{ + ACE_Thread_Control thread_control (tm); + // just a try !! + // we're just reading from our ACE_Message_Queue + ACE_Message_Block *db, *hb; + int n; + // do an endless loop + for (;;) + { + db = new ACE_Message_Block (BUFSIZ); + hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); + + if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); + else if (n == 0) // Client has closed down the connection. + { + + if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down \n")); + return -1; // We do not need to be deregistered by reactor + // as we were not registered at all + } + else // Transform incoming buffer into a Message and pass downstream. + { + db->wr_ptr (n); + *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. + hb->wr_ptr (sizeof (long)); + if (this->router_task_->reply (hb) == -1) + { + cout << "Peer_Handler.svc : router_task->reply failed" << endl ; + return -1; + } + + // return this->router_task_->reply (hb) == -1 ? -1 : 0; + } + } + return 0; +} + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + return this->peer ().send_n (mb->rd_ptr (), mb->length ()); +} + +// Create a new handler and point its ROUTER_TASK_ data member to the +// corresponding router. Note that this router is extracted out of +// the Acceptor_Factory * that is passed in via the +// ACE_Acceptor::handle_input() method. + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::open (void *a) +{ + char buf[BUFSIZ], *p = buf; + + if (this->router_task_->info (&p, sizeof buf) != -1) + ACE_DEBUG ((LM_DEBUG, "(%t) creating handler for %s, fd = %d, this = %d\n", + buf, this->get_handle (), a)); + else + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "info"), -1); + + if ( this->activate (options.t_flags ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "activation of thread failed"), -1); + else if (this->router_task_->bind_peer (this->get_handle (), this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "bind_peer"), -1); + return 0; +} + +// Receive a message from a supplier.. + +template <class ROUTER, class KEY> int +Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h) +{ + + ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h)); +// ACE_Service_Config::reactor ()->remove_handler(h, +// ACE_Event_Handler::RWE_MASK +// |ACE_Event_Handler::DONT_CALL); +// this method should be called only if the peer shuts down +// so we deactivate our ACE_Message_Queue to awake our svc thread + + return 0; + +#if 0 + ACE_Message_Block *db = new ACE_Message_Block (BUFSIZ); + ACE_Message_Block *hb = new ACE_Message_Block (sizeof (KEY), ACE_Message_Block::MB_PROTO, db); + int n; + + if ((n = this->peer ().recv (db->rd_ptr (), db->size ())) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p", "recv failed"), -1); + else if (n == 0) // Client has closed down the connection. + { + if (this->router_task_->unbind_peer (this->get_handle ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p", "unbind failed"), -1); + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down %d\n", h)); + return -1; // Instruct the ACE_Reactor to deregister us by returning -1. + } + else // Transform incoming buffer into a Message and pass downstream. + { + db->wr_ptr (n); + *(long *) hb->rd_ptr () = this->get_handle (); // structure assignment. + hb->wr_ptr (sizeof (long)); + return this->router_task_->reply (hb) == -1 ? -1 : 0; + } +#endif +} + +template <class PH, class PK> +Peer_Router<PH, PK>::Peer_Router (ACE_Thread_Manager *tm) + : ACE_Task<ACE_MT_SYNCH> (tm) +{ +} + +template <class PH, class PK> int +Peer_Router<PH, PK>::send_peers (ACE_Message_Block *mb) +{ + ACE_Map_Iterator<PK, PH *, ACE_RW_Mutex> map_iter = this->peer_map_; + int bytes = 0; + int iterations = 0; + ACE_Message_Block *data_block = mb->cont (); + for (ACE_Map_Entry<PK, PH *> *ss = 0; + map_iter.next (ss) != 0; + map_iter.advance ()) + { + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer via sd %d\n", ss->ext_id_)); + + iterations++; + bytes += ss->int_id_->put (data_block); + } + + delete mb; + return bytes == 0 ? 0 : bytes / iterations; +} + +template <class PH, class PK> +Peer_Router<PH, PK>::~Peer_Router (void) +{ +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::fini (void) +{ + delete this->acceptor_; + return 0; +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::control (ACE_Message_Block *mb) +{ + ACE_IO_Cntl_Msg *ioc = (ACE_IO_Cntl_Msg *) mb->rd_ptr (); + ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds command; + + switch (command = ioc->cmd ()) + { + case ACE_IO_Cntl_Msg::SET_LWM: + case ACE_IO_Cntl_Msg::SET_HWM: + this->water_marks (command, *(size_t *) mb->cont ()->rd_ptr ()); + break; + default: + return -1; + } + return 0; +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::unbind_peer (PK key) +{ + return this->peer_map_.unbind (key); +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::bind_peer (PK key, Peer_Handler<Peer_Router<PH, PK>, PK> *ph) +{ + PH *peer_handler = (PH *) ph; + return this->peer_map_.bind (key, peer_handler); +} + +template <class PH, class PK> ACE_INLINE int +Peer_Router<PH, PK>::init (int argc, char *argv[]) +{ + this->acceptor_ = new Acceptor_Factory <PH, PK> (this); + + if (this->acceptor_->init (argc, argv) == -1 + || this->peer_map_.open () == -1) + return -1; + else + { + ACE_UPIPE_Addr addr; + ACE_UPIPE_Acceptor &pa = this->acceptor_->acceptor (); + + if (pa.get_local_addr (addr) != -1) + ACE_DEBUG ((LM_DEBUG, "(%t) initializing %s, file = %s, fd = %d, this = %u\n", + this->name (), addr.get_path_name (), pa.get_handle (), this)); + else + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), -1); + } + return 0; +} + +#undef PH +#undef PA +#undef PAD +#undef PK +#undef PM +#endif /* ACE_HAS_THREADS */ +#endif /* _PEER_ROUTER_C */ diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.h b/examples/ASX/UPIPE_Event_Server/Peer_Router.h new file mode 100644 index 00000000000..b344497d4b1 --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.h @@ -0,0 +1,116 @@ +/* -*- C++ -*- */ +// @(#)Peer_Router.h 1.1 10/18/96 + +// The interface between one or more peers and a stream. A peer +// typically runs remotely on another machine. + +#if !defined (_PEER_ROUTER_H) +#define _PEER_ROUTER_H + +#include "ace/Acceptor.h" +#include "ace/Svc_Handler.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/UPIPE_Addr.h" +#include "ace/Thread_Manager.h" +#include "ace/Map_Manager.h" + +#if defined (ACE_HAS_THREADS) + +// Forward declaration. +template <class PEER_HANDLER, class KEY> +class Peer_Router; + +template <class PEER_HANDLER, class KEY> +class Acceptor_Factory : public ACE_Acceptor<PEER_HANDLER, ACE_UPIPE_ACCEPTOR> +{ +public: + Acceptor_Factory (Peer_Router<PEER_HANDLER, KEY> *pr); + Peer_Router<PEER_HANDLER, KEY> *router (void); + + int init (int argc, char *argv[]); + // Initialize the acceptor when it's linked dynamically. + +private: + Peer_Router<PEER_HANDLER, KEY> *pr_; +}; + +// Receive input from a Peer.. +template <class ROUTER, class KEY> +class Peer_Handler : public ACE_Svc_Handler<ACE_UPIPE_STREAM, ACE_MT_SYNCH> +{ +public: + Peer_Handler (ACE_Thread_Manager * = 0); + + virtual int open (void * = 0); + // Called by the ACE_Acceptor::handle_input() to activate this object. + + virtual int handle_input (ACE_HANDLE); + // Receive input from the peer.. + + virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); + // Send output to a peer. + +protected: + ROUTER *router_task_; + // Pointer to write task.. + +private: + // Don't need this method here... + virtual int svc (void); +}; + +// This abstract base class provides mechanisms for routing messages +// to/from a ACE_Stream from/to one or more peers (which are typically +// running on remote hosts). A subclass of Peer_Router overrides the +// open(), close(), and put() methods in order to specialize the +// behavior of the router to meet application-specific requirements. + +template <class PEER_HANDLER, class PEER_KEY> +class Peer_Router : public ACE_Task<ACE_MT_SYNCH> +{ +public: + Peer_Router (ACE_Thread_Manager * = 0); + ~Peer_Router (void); + + typedef Peer_Handler<Peer_Router<PEER_HANDLER, PEER_KEY>, PEER_KEY> HANDLER; + + // Remove a PEER_HANDLER from the PEER_MAP. + virtual int unbind_peer (PEER_KEY); + + // Add a PEER_HANDLER to the PEER_MAP. + virtual int bind_peer (PEER_KEY, HANDLER *); + + // Send the message block to the peer(s).. + int send_peers (ACE_Message_Block *mb); + +protected: +// Handle control messages arriving from adjacent Modules. + virtual int control (ACE_Message_Block *); + + // Map used to keep track of active peers. + ACE_Map_Manager <PEER_KEY, PEER_HANDLER *, ACE_RW_Mutex> peer_map_; + + // Dynamic linking initialization hooks inherited from ACE_Task. + virtual int init (int argc, char *argv[]); + virtual int fini (void); + + // Factory for accepting new PEER_HANDLERs. + Acceptor_Factory<PEER_HANDLER, PEER_KEY> *acceptor_; + +private: +// Prevent copies and pass-by-value. + Peer_Router (const Peer_Router<PEER_HANDLER, PEER_KEY> &); + void operator= (const Peer_Router<PEER_HANDLER, PEER_KEY> &); +}; + +#if defined (__ACE_INLINE__) +#define ACE_INLINE inline +#else +#define ACE_INLINE +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Peer_Router.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ +#endif /* ACE_HAS_THREADS */ +#endif /* _PEER_ROUTER_H */ diff --git a/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp b/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp new file mode 100644 index 00000000000..414fc5c9ccf --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Supplier_Router.cpp @@ -0,0 +1,126 @@ +#include "Supplier_Router.h" +// @(#)Supplier_Router.cpp 1.1 10/18/96 + +#include "Options.h" + +#if defined (ACE_HAS_THREADS) + +typedef Acceptor_Factory<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_FACTORY; + +int +Supplier_Handler::open (void *a) +{ + SUPPLIER_FACTORY *af = (SUPPLIER_FACTORY *) a; + this->router_task_ = af->router (); + return this->Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY>::open (a); +} + +Supplier_Handler::Supplier_Handler (ACE_Thread_Manager *tm) + : Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> (tm) +{ +} + +// Create a new router and associate it with the REACTOR parameter.. + +Supplier_Router::Supplier_Router (ACE_Thread_Manager *tm) + : SUPPLIER_ROUTER (tm) +{ +} + +// Handle incoming messages in a separate thread.. + +int +Supplier_Router::svc (void) +{ + ACE_ASSERT (this->is_writer ()); + + ACE_Thread_Control tc (this->thr_mgr ()); + ACE_Message_Block *message_block = 0; + + if (options.debug ()) + ACE_DEBUG ((LM_DEBUG, "(%t) starting svc in %s\n", this->name ())); + + while (this->getq (message_block) > 0) + { + if (this->put_next (message_block) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) put_next failed in %s\n", this->name ()), -1); + } + + return 0; + // Note the implicit ACE_OS::thr_exit() via ACE_Thread_Control's destructor. +} + +// Initialize the Router.. + +int +Supplier_Router::open (void *) +{ + ACE_ASSERT (this->is_writer ()); + + char *argv[3]; + + argv[0] = (char *) this->name (); + argv[1] = options.supplier_file (); + argv[2] = 0; + + if (this->init (1, &argv[1]) == -1) + return -1; + // Make this an active object. Return this->activate + // (options.t_flags ()); +} + +// Close down the router.. + +int +Supplier_Router::close (u_long) +{ + ACE_ASSERT (this->is_writer ()); + this->peer_map_.close (); + this->msg_queue ()->deactivate(); + return 0; +} + +// Send a MESSAGE_BLOCK to the supplier(s).. + +int +Supplier_Router::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + ACE_ASSERT (this->is_writer ()); + + if (mb->msg_type () == ACE_Message_Block::MB_IOCTL) + { + this->control (mb); + return this->put_next (mb); + } + else + { +//printf("supplier-Router is routing: send_peers\n"); + return this->send_peers (mb); + } +} + +// Return information about the Supplier_Router ACE_Module.. + +int +Supplier_Router::info (char **strp, size_t length) const +{ + char buf[BUFSIZ]; + ACE_UPIPE_Addr addr; + const char *mod_name = this->name (); + ACE_UPIPE_Acceptor &sa = (ACE_UPIPE_Acceptor &) *this->acceptor_; + + if (sa.get_local_addr (addr) == -1) + return -1; + + ACE_OS::sprintf (buf, "%s\t %s/ %s", + mod_name, "upipe", + "# supplier router\n"); + + if (*strp == 0 && (*strp = ACE_OS::strdup (mod_name)) == 0) + return -1; + else + ACE_OS::strncpy (*strp, mod_name, length); + return ACE_OS::strlen (mod_name); +} + +#endif /* ACE_HAS_THREADS */ diff --git a/examples/ASX/UPIPE_Event_Server/Supplier_Router.h b/examples/ASX/UPIPE_Event_Server/Supplier_Router.h new file mode 100644 index 00000000000..bb304042586 --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/Supplier_Router.h @@ -0,0 +1,52 @@ +/* -*- C++ -*- */ +// @(#)Supplier_Router.h 1.1 10/18/96 + +// The interface between a supplier and an Event Service ACE_Stream. + +#if !defined (_SUPPLIER_ROUTER_H) +#define _SUPPLIER_ROUTER_H + +#include "ace/UPIPE_Addr.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/Map_Manager.h" +#include "ace/Svc_Handler.h" +#include "Peer_Router.h" + +#if defined (ACE_HAS_THREADS) + +// Forward declaration. +class Supplier_Handler; + +// Type of search key for SUPPLIER_MAP. +typedef long SUPPLIER_KEY; + +// Instantiated type for routing messages to suppliers. + +typedef Peer_Router<Supplier_Handler, SUPPLIER_KEY> SUPPLIER_ROUTER; + +class Supplier_Handler + : public Peer_Handler<SUPPLIER_ROUTER, SUPPLIER_KEY> +{ +public: + Supplier_Handler (ACE_Thread_Manager *tm = 0); + virtual int open (void *); +}; + +class Supplier_Router : public SUPPLIER_ROUTER +{ +public: + Supplier_Router (ACE_Thread_Manager *); + +protected: + // ACE_Task hooks.. + virtual int open (void *a = 0); + virtual int close (u_long flags = 0); + virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); + virtual int svc (void); + + // Dynamic linking hooks inherited from Peer_Router. + virtual int info (char **info_string, size_t length) const; +}; + +#endif /* ACE_HAS_THREADS */ +#endif /* _SUPPLIER_ROUTER_H */ diff --git a/examples/ASX/UPIPE_Event_Server/event_server.cpp b/examples/ASX/UPIPE_Event_Server/event_server.cpp new file mode 100644 index 00000000000..bdafdb23de7 --- /dev/null +++ b/examples/ASX/UPIPE_Event_Server/event_server.cpp @@ -0,0 +1,252 @@ +// Test the event server. +// @(#)event_server.cpp 1.1 10/18/96 + +#include "ace/Log_Msg.h" +#include "ace/Stream.h" +#include "ace/Service_Config.h" +#include "Options.h" +#include "Consumer_Router.h" +#include "Event_Analyzer.h" +#include "Supplier_Router.h" +#include "ace/UPIPE_Acceptor.h" +#include "ace/UPIPE_Connector.h" + +#if defined (ACE_HAS_THREADS) + +typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream; +typedef ACE_Module<ACE_MT_SYNCH> MT_Module; + +// Handle SIGINT and terminate the entire application. + +class Quit_Handler : public ACE_Sig_Adapter +{ +public: + Quit_Handler (void); + virtual int handle_input (ACE_HANDLE fd); +}; + +Quit_Handler::Quit_Handler (void) + : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Service_Config::end_reactor_event_loop)) +{ + // Register to trap input from the user. + if (ACE::register_stdin_handler (this, + ACE_Service_Config::reactor (), + ACE_Service_Config::thr_mgr ()) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler")); + // Register to trap the SIGINT signal. + else if (ACE_Service_Config::reactor ()->register_handler + (SIGINT, this) == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "register_handler")); +} + +int +Quit_Handler::handle_input (ACE_HANDLE) +{ + options.stop_timer (); + ACE_DEBUG ((LM_INFO, " (%t) closing down the test\n")); + options.print_results (); + + ACE_Service_Config::end_reactor_event_loop (); + return 0; +} + +static void * +consumer (void *) +{ + ACE_UPIPE_Stream c_stream; + ACE_UPIPE_Addr c_addr ("/tmp/conupipe"); + + int iter = options.iterations (); + int verb = options.verbose (); + int msiz = options.message_size (); + int secs, par1, par2, i; + time_t currsec; + + if (verb) + cout << "consumer starting connect" << endl; + + ACE_UPIPE_Connector con; + + if (con.connect (c_stream, c_addr) == -1) + ACE_DEBUG ((LM_INFO, " (%t) connect failed\n")); + else + cout << "consumer :we're connected" << endl; + + char buf[BUFSIZ]; + int n; + ACE_Message_Block *mb_p; + + int done = 0; + int cnt = 0; + ACE_OS::time (&currsec); + + par1= (time_t) currsec; + + while (done == 0 + && ((n = c_stream.recv (mb_p)) != -1)) + if (mb_p->length () > 1) + { + cnt++; + if (verb) + cout << " consumer received message !!!!!! " + << mb_p->rd_ptr () << endl; + } + else + { + if (verb) + cout << "consumer got last mb" + << (char) * (mb_p->rd_ptr ()) << endl; + c_stream.close (); + done = 1; + } + + ACE_OS::time (&currsec); + par2 = (time_t) currsec; + + secs = par2 - par1; + + if (secs <= 0) + secs=1; + + cout << "consumer got " << cnt << " messages of size " << msiz + << "within " << secs << " seconds" << endl; + + ACE_OS::sleep (2); + cout << "consumer terminating " << endl; + return 0; +} + +static void * +supplier (void *dummy) +{ + ACE_UPIPE_Stream s_stream; + ACE_UPIPE_Addr serv_addr ("/tmp/supupipe"); + ACE_UPIPE_Connector con; + + int iter = options.iterations (); + int verb = options.verbose (); + int msiz = options.message_size (); + cout << "supplier starting connect" << endl; + + if (con.connect (s_stream, serv_addr) == -1) + ACE_DEBUG ((LM_INFO, " (%t) connect failed\n")); + + cout << "supplier : we're connected" << endl; + int n; + n = 0; + ACE_Message_Block * mb_p; + + while (n < iter) + { + mb_p = new ACE_Message_Block (msiz); + strcpy (mb_p->rd_ptr (), (char *) dummy); + mb_p->length (msiz); + if (verb) + cout << "supplier sending 1 message_block" << endl; + if (s_stream.send (mb_p) == -1) + { + cout << "supplier send failed" << endl; + return (void *) -1; + } + n++; + } + + mb_p = new ACE_Message_Block (10); + mb_p->length (1); + *mb_p->rd_ptr () = 'g'; + + cout << "supplier sending last message_block" << endl; + + if (s_stream.send (mb_p) == -1) + { + cout << "supplier send last mb failed" << endl; + return (void *) -1; + } + mb_p = new ACE_Message_Block (10); + mb_p->length (0); + + if (verb) + cout << "supplier sending very last message_block" << endl; + + if (s_stream.send (mb_p) == -1) + { + cout << "supplier send very last mb failed" << endl; + return (void *) -1; + } + + ACE_OS::sleep (2); + cout << "supplier terminating" << endl; + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_Service_Config daemon; + + options.parse_args (argc, argv); + options.start_timer (); + + // Primary ACE_Stream for EVENT_SERVER application. + MT_Stream event_server; + + // Enable graceful shutdowns.... + Quit_Handler quit_handler; + + // Create the modules.. + + MT_Module *sr = new MT_Module ("Supplier_Router", + new Supplier_Router (ACE_Service_Config::thr_mgr ())); + MT_Module *ea = new MT_Module ("Event_Analyzer", + new Event_Analyzer, + new Event_Analyzer); + MT_Module *cr = new MT_Module ("Consumer_Router", + 0, // 0 triggers the creation of a ACE_Thru_Task... + new Consumer_Router (ACE_Service_Config::thr_mgr ())); + + // Push the modules onto the event_server stream. + + if (event_server.push (sr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Supplier_Router)"), -1); + + if (event_server.push (ea) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Event_Analyzer)"), -1); + + if (event_server.push (cr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "push (Consumer_Router)"), -1); + + // Set the high and low water marks appropriately. + + int wm = options.low_water_mark (); + + if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "push (setting low watermark)"), -1); + + wm = options.high_water_mark (); + if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "push (setting high watermark)"), -1); + + // spawn the two threads. + + if (ACE_Service_Config::thr_mgr ()->spawn (ACE_THR_FUNC (consumer), (void *) 0, + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + + else if (ACE_Service_Config::thr_mgr ()->spawn (ACE_THR_FUNC (supplier), (void *) "hello", + THR_NEW_LWP | THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); + + // Perform the main event loop waiting for the user to type ^C or to + // enter a line on the ACE_STDIN. + + daemon.run_reactor_event_loop (); + + ACE_DEBUG ((LM_DEBUG, "main exiting\n")); +} +#else +int +main (void) +{ + ACE_ERROR_RETURN ((LM_ERROR, "test not defined for this platform\n"), -1); +} +#endif /* ACE_HAS_THREADS */ |