diff options
Diffstat (limited to 'ACE/examples/Reactor/Misc/notification.cpp')
-rw-r--r-- | ACE/examples/Reactor/Misc/notification.cpp | 385 |
1 files changed, 385 insertions, 0 deletions
diff --git a/ACE/examples/Reactor/Misc/notification.cpp b/ACE/examples/Reactor/Misc/notification.cpp new file mode 100644 index 00000000000..a04663b28ad --- /dev/null +++ b/ACE/examples/Reactor/Misc/notification.cpp @@ -0,0 +1,385 @@ +// $Id$ + +#include "ace/OS_NS_unistd.h" +#include "ace/Service_Config.h" +#include "ace/Reactor.h" +#include "ace/Thread_Manager.h" +#include "ace/Thread.h" +#include "ace/Signal.h" + +ACE_RCSID(Misc, notification, "$Id$") + +#if defined (ACE_HAS_THREADS) +#if defined (CHORUS) +// Chorus does not have signal, so we'll stop after a number of rounds. +#define MAX_ITERATIONS 3 +#else +#define MAX_ITERATIONS 10000 +#endif /* CHORUS */ + +class Thread_Handler : public ACE_Event_Handler +{ + // = TITLE + // Illustrate how the ACE_Reactor's thread-safe event notification + // mechanism works. + // + // = DESCRIPTION + // Handle timeouts in the main thread via the ACE_Reactor and I/O + // events in a separate thread. Just before the separate I/O + // thread exits it notifies the ACE_Reactor in the main thread + // using the ACE_Reactor's notification mechanism. +public: + Thread_Handler (long delay, + long interval, + size_t n_threads, + size_t max_iterations); + // Constructor. + + Thread_Handler (size_t id, + size_t max_iterations); + + ~Thread_Handler (void); + // Destructor. + + virtual int handle_signal (int signum, + siginfo_t * = 0, + ucontext_t * = 0); + // Handle signals. + + virtual int handle_exception (ACE_HANDLE); + // Print data from main thread. + + virtual int handle_output (ACE_HANDLE); + // Print data from main thread. + + virtual int handle_timeout (const ACE_Time_Value &, + const void *); + // Handle timeout events in the main thread. + + virtual int handle_input (ACE_HANDLE); + // General notification messages to the Reactor. + + virtual int notify (ACE_Time_Value *tv = 0); + // Perform notifications. + + virtual int svc (void); + // Handle I/O events in a separate threads. + +private: + static void *svc_run (void *); + // Glues C++ to C thread library functions. + + size_t id_; + // ID passed in by Thread_Handler constructor. + + size_t iterations_; + + static sig_atomic_t shutdown_; + // Shutting down. + + // = Timing variables. + // Delay factor for timer-driven I/O. + static ACE_Time_Value delay_; + + // Interval factor for Event_Handler timer. + static ACE_Time_Value interval_; +}; + +// Shutdown flag. +sig_atomic_t Thread_Handler::shutdown_ = 0; + +// Delay factor for timer-driven I/O. +ACE_Time_Value Thread_Handler::delay_; + +// Interval factor for Event_Handler timer. +ACE_Time_Value Thread_Handler::interval_; + +Thread_Handler::Thread_Handler (size_t id, + size_t max_iterations) + : id_ (id), + iterations_ (max_iterations) +{ +} + +Thread_Handler::~Thread_Handler (void) +{ + // Cleanup resources so that we don't crash and burn when shutdown. + ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()); + ACE_Reactor::instance ()->cancel_timer (this); +} + +Thread_Handler::Thread_Handler ( + long delay, + long interval, + size_t n_threads, + size_t max_iterations) + : iterations_ (max_iterations) +{ + ACE_Sig_Set sig_set; + + sig_set.sig_add (SIGQUIT); + sig_set.sig_add (SIGINT); + + delay_.set (delay); + interval_.set (interval); + this->id_ = 0; + + if (ACE_Event_Handler::register_stdin_handler (this, + ACE_Reactor::instance (), + ACE_Thread_Manager::instance ()) == -1) + ACE_ERROR ((LM_ERROR, + "%p\n", + "register_stdin_handler")); + else if (ACE_Reactor::instance ()->register_handler (sig_set, + this) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "register_handler")); + else if (ACE_Reactor::instance ()->schedule_timer + (this, + 0, + Thread_Handler::delay_, + Thread_Handler::interval_) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "schedule_timer")); + + // Set up this thread's signal mask to block all the signal in the + // <sig_set>, which is inherited by the threads it spawns. + ACE_Sig_Guard guard (&sig_set); + + // Create N new threads of control Thread_Handlers. + + for (size_t i = 0; i < n_threads; i++) + { + Thread_Handler *th; + + ACE_NEW (th, + Thread_Handler (i + 1, + this->iterations_)); + + if (ACE_Thread::spawn (reinterpret_cast<ACE_THR_FUNC> (&Thread_Handler::svc_run), + reinterpret_cast<void *> (th), + THR_NEW_LWP | THR_DETACHED) != 0) + ACE_ERROR ((LM_ERROR, + "%p\n", + "ACE_Thread::spawn")); + } + + // The destructor of <guard> unblocks the signal set so that only + // this thread receives them! +} + +int +Thread_Handler::notify (ACE_Time_Value *timeout) +{ + // Just do something to test the ACE_Reactor's multi-thread + // capabilities... + + if (ACE_Reactor::instance ()->notify + (this, + ACE_Event_Handler::EXCEPT_MASK, + timeout) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "notification::notify:exception"), + -1); + else if (ACE_Reactor::instance ()->notify + (this, + ACE_Event_Handler::WRITE_MASK, + timeout) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) %p\n", + "notification::notify:write"), + -1); + return 0; +} + +// Test stdin handling that uses <select> to demultiplex HANDLEs. +// Input is only handled by the main thread. + +int +Thread_Handler::handle_input (ACE_HANDLE handle) +{ + char buf[BUFSIZ]; + ssize_t n = ACE_OS::read (handle, buf, sizeof buf); + + if (n > 0) + { + ACE_DEBUG ((LM_DEBUG, + "input to (%t) %*s", + n, + buf)); + + ACE_DEBUG ((LM_DEBUG, + "%d more input to kill\n", + this->iterations_)); + + // Only wait up to 10 milliseconds to notify the Reactor. + ACE_Time_Value timeout (0, + 10 * 1000); + + if (this->notify (&timeout) == -1) + ACE_ERROR ((LM_DEBUG, + "(%t), %p\n", + "notification::handle_input:notify")); + return 0; + } + else + return -1; +} + +// Perform a task that will test the ACE_Reactor's multi-threading +// capabilities in separate threads. + +int +Thread_Handler::svc (void) +{ + ACE_Time_Value sleep_timeout (0, + // Transform this into microseconds and divide by 2. + (Thread_Handler::interval_.sec () * ACE_ONE_SECOND_IN_USECS) / 2); + + for (int i = this->iterations_; + i > 0; + --i) + { + if (this->shutdown_ != 0) + break; + + // Block for delay_.secs () / 2, then notify the Reactor. + ACE_OS::sleep (sleep_timeout); + + // Wait up to 10 milliseconds to notify the Reactor. + ACE_Time_Value timeout (0, + 10 * 1000); + if (this->notify (&timeout) == -1) + ACE_ERROR ((LM_ERROR, + "(%t) %p\n", + "notify")); + } + + ACE_Reactor::instance ()->remove_handler (this, + ALL_EVENTS_MASK); + ACE_DEBUG ((LM_DEBUG, + "(%t) exiting svc\n")); + return 0; +} + +// Test signal handling. + +int +Thread_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *) +{ + // @@ Note that this code is not portable to all OS platforms since + // it uses print statements within signal handler context. + ACE_DEBUG ((LM_DEBUG, + "(%t) received signal %S\n", + signum)); + + switch (signum) + { + case SIGINT: + case SIGQUIT: + ACE_ERROR ((LM_ERROR, + "(%t) ******************** shutting down %n on signal %S\n", + signum)); + this->shutdown_ = 1; + ACE_Reactor::end_event_loop(); + } + return 0; +} + +int +Thread_Handler::handle_timeout (const ACE_Time_Value &time, const void *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) received timeout at (%u, %u), iterations = %d\n", + time.sec (), + time.usec (), + this->iterations_)); + + if (--this->iterations_ <= 0 + || Thread_Handler::interval_.sec () == 0) + ACE_Reactor::end_event_loop (); + + return 0; +} + +// Called by the ACE_Reactor when it receives a notification. + +int +Thread_Handler::handle_exception (ACE_HANDLE) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) exception to id %d, iteration = %d\n", + this->id_, + this->iterations_)); + return 0; +} + +// Called by the ACE_Reactor when it receives a notification. + +int +Thread_Handler::handle_output (ACE_HANDLE) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) output to id %d, iteration = %d\n", + this->id_, + // This decrement must come last since + // <handle_exception> is called before <handle_output>! + this->iterations_--)); + return 0; +} + +// "Shim" function that integrates C thread API with C++. + +void * +Thread_Handler::svc_run (void *eh) +{ + Thread_Handler *this_handler = + reinterpret_cast<Thread_Handler *> (eh); + + if (this_handler->svc () == 0) + return 0; + else + return reinterpret_cast<void *> (-1); +} + +int +ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + ACE_LOG_MSG->open (argv[0]); + + if (argc < 4) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("usage: %s delay interval n_threads [iterations]\n"), + argv[0])); + ACE_OS::exit (1); + } + + int delay = ACE_OS::atoi (argv[1]); + int interval = ACE_OS::atoi (argv[2]); + size_t n_threads = ACE_OS::atoi (argv[3]); + size_t max_iterations = argc > 4 ? ACE_OS::atoi (argv[4]) : MAX_ITERATIONS; + + Thread_Handler thr_handler (delay, + interval, + n_threads, + max_iterations); + + ACE_Reactor::instance ()->run_reactor_event_loop (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("exiting from main\n"))); + return 0; +} +#else +int +main (int, char *[]) +{ + ACE_ERROR_RETURN ((LM_ERROR, + "threads must be supported to run this application\n"), -1); +} +#endif /* ACE_HAS_THREADS */ |