diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 14:51:23 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2008-03-04 14:51:23 +0000 |
commit | 99aa8c60282c7b8072eb35eb9ac815702f5bf586 (patch) | |
tree | bda96bf8c3a4c2875a083d7b16720533c8ffeaf4 /ACE/tests/MT_Reactor_Upcall_Test.cpp | |
parent | c4078c377d74290ebe4e66da0b4975da91732376 (diff) | |
download | ATCD-99aa8c60282c7b8072eb35eb9ac815702f5bf586.tar.gz |
undoing accidental deletion
Diffstat (limited to 'ACE/tests/MT_Reactor_Upcall_Test.cpp')
-rw-r--r-- | ACE/tests/MT_Reactor_Upcall_Test.cpp | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/ACE/tests/MT_Reactor_Upcall_Test.cpp b/ACE/tests/MT_Reactor_Upcall_Test.cpp new file mode 100644 index 00000000000..e446707dbdc --- /dev/null +++ b/ACE/tests/MT_Reactor_Upcall_Test.cpp @@ -0,0 +1,384 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// MT_Reactor_Upcall_Test.cpp +// +// = DESCRIPTION +// This is a test that shows how to handle upcalls from the +// TP_Reactor and the WFMO_Reactor when the event loop is being run +// by multiple threads. +// +// = AUTHOR +// Irfan Pyarali <irfan@cs.wustl.edu> +// +// ============================================================================ + +#include "test_config.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/Reactor.h" +#include "ace/TP_Reactor.h" +#include "ace/WFMO_Reactor.h" +#include "ace/Dev_Poll_Reactor.h" +#include "ace/Pipe.h" +#include "ace/Task.h" +#include "ace/Get_Opt.h" +#include "ace/ACE.h" + +ACE_RCSID(tests, MT_Reactor_Upcall_Test, "$Id$") + +int number_of_event_loop_threads = 3; +int number_of_messages = 10; +int sleep_time_in_msec = 100; +int lock_upcall = 1; +static const char *message = "Hello there!"; + +class Guard +{ +public: + Guard (ACE_SYNCH_MUTEX &lock) + : lock_ (lock) + { + if (lock_upcall) + lock.acquire (); + } + + ~Guard (void) + { + if (lock_upcall) + this->lock_.release (); + } + + ACE_SYNCH_MUTEX &lock_; +}; + +struct Message +{ + enum Type + { + DATA, + SHUTDOWN + }; + + Type type_; + size_t size_; + char data_[BUFSIZ]; +}; + +class Handler : public ACE_Event_Handler +{ +public: + Handler (ACE_Reactor &reactor); + int handle_input (ACE_HANDLE fd); + + ACE_Pipe pipe_; + int number_of_messages_read_; + ACE_SYNCH_MUTEX lock_; + int shutdown_; +}; + +Handler::Handler (ACE_Reactor &reactor) + : ACE_Event_Handler (&reactor), + number_of_messages_read_ (0), + shutdown_ (1) +{ + // Create the pipe. + int result = this->pipe_.open (); + if (result != 0) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("pipe open"))); + else + { + // Register for input events. + result = + this->reactor ()->register_handler (this->pipe_.read_handle (), + this, + ACE_Event_Handler::READ_MASK); + if (result != 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Can't register pipe for READ"))); + else + this->shutdown_ = 0; + } +} + +int +Handler::handle_input (ACE_HANDLE fd) +{ + Guard monitor (this->lock_); + + // If we have been shutdown, return. + if (this->shutdown_) + return 0; + + // Read fixed parts of message. + Message message; + ssize_t result = + ACE::recv_n (fd, + &message.type_, + sizeof (message.type_)); + if (result != static_cast<ssize_t> (sizeof (message.type_))) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"), + result, + ACE_TEXT ("recv 1"))); + result = + ACE::recv_n (fd, + &message.size_, + sizeof (message.size_)); + if (result != static_cast<ssize_t> (sizeof (message.size_))) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"), + result, + ACE_TEXT ("recv 2"))); + + // On shutdown message, stop the event loop. + if (message.type_ == Message::SHUTDOWN) + { + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Shutdown message\n"))); + + this->shutdown_ = 1; + + this->reactor ()->end_reactor_event_loop (); + + // Remove self from Reactor. + return -1; + } + + // Else it is a data message: read the data. + result = + ACE::recv_n (fd, + &message.data_, + message.size_); + if (result != static_cast<ssize_t> (message.size_)) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"), + result, + ACE_TEXT ("recv 3"))); + else + { + message.data_[result] = '\0'; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Starting to handle message %d: %s\n"), + this->number_of_messages_read_ + 1, + message.data_)); + } + + // Process message (sleep). + ACE_OS::sleep (ACE_Time_Value (0, + sleep_time_in_msec * 1000)); + + // Keep count. + this->number_of_messages_read_++; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Completed handling message %d\n"), + this->number_of_messages_read_)); + + return 0; +} + +class Event_Loop_Task : public ACE_Task_Base +{ +public: + Event_Loop_Task (ACE_Reactor &reactor); + int svc (void); + +private: + ACE_Reactor &reactor_; +}; + +Event_Loop_Task::Event_Loop_Task (ACE_Reactor &reactor) + : reactor_ (reactor) +{ +} + +int +Event_Loop_Task::svc (void) +{ + return this->reactor_.run_reactor_event_loop (); +} + +void +test_reactor_upcall (ACE_Reactor &reactor) +{ + Handler handler (reactor); + if (handler.shutdown_) + { + ACE_ERROR ((LM_ERROR, ACE_TEXT ("Error initializing test; abort.\n"))); + return; + } + + Event_Loop_Task event_loop_task (reactor); + + // Start up the event loop threads. + int result = + event_loop_task.activate (THR_NEW_LWP | THR_JOINABLE, + number_of_event_loop_threads); + if (result != 0) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("test_reactor_upcall, activate"))); + + // Data message. + Message data_message; + data_message.type_ = + Message::DATA; + data_message.size_ = + ACE_OS::strlen (message); + ACE_OS::strcpy (data_message.data_, message); + + // Send in three pieces because the struct members may not be adjacent + // in memory. + for (int i = 0; + i < number_of_messages; + ++i) + { + // This should trigger a call to <handle_input>. + ssize_t sent = + ACE::send_n (handler.pipe_.write_handle (), + &data_message.type_, + sizeof (data_message.type_)); + if (sent == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 1"))); + sent = + ACE::send_n (handler.pipe_.write_handle (), + &data_message.size_, + sizeof (data_message.size_)); + if (sent == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 2"))); + sent = + ACE::send_n (handler.pipe_.write_handle (), + &data_message.data_, + data_message.size_); + if (sent == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 3"))); + } + + // We are done: send shutdown message. + Message shutdown_message; + shutdown_message.type_ = + Message::SHUTDOWN; + shutdown_message.size_ = 0; + + // This should trigger a call to <handle_input>. + ssize_t sent = ACE::send_n (handler.pipe_.write_handle (), + &shutdown_message.type_, + sizeof (shutdown_message.type_)); + if (sent == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 4"))); + sent = ACE::send_n (handler.pipe_.write_handle (), + &shutdown_message.size_, + sizeof (shutdown_message.size_)); + if (sent == -1) + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 5"))); + + // Wait for the event loop tasks to exit. + event_loop_task.wait (); +} + +int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:m:s:l:")); + + int c; + + while ((c = get_opt ()) != -1) + switch (c) + { + case 't': + number_of_event_loop_threads = + ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'm': + number_of_messages = + ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 's': + sleep_time_in_msec = + ACE_OS::atoi (get_opt.opt_arg ()); + break; + case 'l': + lock_upcall = + ACE_OS::atoi (get_opt.opt_arg ()); + break; + default: + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_TEXT ("usage: %s\n") + ACE_TEXT ("\t-m <number of messages> (defaults to %d)\n") + ACE_TEXT ("\t-t <number of event loop threads> (defaults to %d)\n") + ACE_TEXT ("\t-s <sleep time in msec> (defaults to %d)\n") + ACE_TEXT ("\t-l <lock upcall> (defaults to %d)\n") + ACE_TEXT ("\n"), + argv [0], + number_of_messages, + number_of_event_loop_threads, + sleep_time_in_msec, + lock_upcall), + -1); + } + + return 0; +} + +int +run_main (int argc, ACE_TCHAR *argv[]) +{ + ACE_START_TEST (ACE_TEXT ("MT_Reactor_Upcall_Test")); + +#if defined (ACE_HAS_THREADS) + + // ACE_LOG_MSG->clr_flags (ACE_Log_Msg::VERBOSE_LITE); + + int result = + parse_args (argc, argv); + + if (result != 0) + return result; + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing TP Reactor\n"))); + + ACE_TP_Reactor tp_reactor_impl; + ACE_Reactor tp_reactor (&tp_reactor_impl); + + test_reactor_upcall (tp_reactor); + +#if defined (ACE_HAS_EVENT_POLL) + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing Dev Poll Reactor\n"))); + + ACE_Dev_Poll_Reactor dev_poll_reactor_impl; + ACE_Reactor dev_poll_reactor (&dev_poll_reactor_impl); + + test_reactor_upcall (dev_poll_reactor); + +#endif /* ACE_HAS_EVENT_POLL */ + +#if defined (ACE_WIN32) && \ + (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) + + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing WFMO Reactor\n"))); + + ACE_WFMO_Reactor wfmo_reactor_impl; + ACE_Reactor wfmo_reactor (&wfmo_reactor_impl); + + test_reactor_upcall (wfmo_reactor); + +#endif /* ACE_WIN32 && ACE_HAS_WINSOCK2 */ + +#else /* ACE_HAS_THREADS */ + ACE_UNUSED_ARG(argc); + ACE_UNUSED_ARG(argv); + + ACE_ERROR ((LM_INFO, + ACE_TEXT ("threads not supported on this platform\n"))); + +#endif /* ACE_HAS_THREADS */ + + ACE_END_TEST; + return 0; +} |