diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-23 16:34:11 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-23 16:34:11 +0000 |
commit | ca9104d4351b2930ed79a683823b2715c8dc4d3d (patch) | |
tree | 0ad7c4784ff6671e0f16b19e1b899b950c03656c /ace | |
parent | 85ad79e96bde9abae70376812c84bac74dfae7a7 (diff) | |
download | ATCD-ca9104d4351b2930ed79a683823b2715c8dc4d3d.tar.gz |
ChangeLog Entry: Sun May 23 11:33:07 1999 Alexander Babu Arulanthu <alex@cs.wustl.edu>
Diffstat (limited to 'ace')
-rw-r--r-- | ace/Asynch_IO.cpp | 11 | ||||
-rw-r--r-- | ace/Asynch_IO.h | 7 | ||||
-rw-r--r-- | ace/Object_Manager.cpp | 4 | ||||
-rw-r--r-- | ace/Object_Manager.h | 1 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 9 | ||||
-rw-r--r-- | ace/POSIX_Proactor.cpp | 80 | ||||
-rw-r--r-- | ace/POSIX_Proactor.h | 10 | ||||
-rw-r--r-- | ace/Proactor.cpp | 141 | ||||
-rw-r--r-- | ace/Proactor_Impl.h | 9 | ||||
-rw-r--r-- | ace/WIN32_Proactor.cpp | 76 | ||||
-rw-r--r-- | ace/WIN32_Proactor.h | 8 |
11 files changed, 323 insertions, 33 deletions
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 9a87f63c893..373356337eb 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -969,11 +969,14 @@ ACE_Handler::handle_write_file (const ACE_Asynch_Write_File::Result &result) } void -ACE_Handler::handle_time_out (const ACE_Time_Value &tv, - const void *act) +ACE_Handler::handle_time_out (const ACE_Time_Value & /* tv */, + const void * /* act */) +{ +} + +void +ACE_Handler::handle_wakeup (void) { - ACE_UNUSED_ARG (tv); - ACE_UNUSED_ARG (act); } ACE_Proactor * diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h index 4e1c72d18b6..fb555a90610 100644 --- a/ace/Asynch_IO.h +++ b/ace/Asynch_IO.h @@ -1002,7 +1002,12 @@ public: virtual void handle_time_out (const ACE_Time_Value &tv, const void *act = 0); // Called when timer expires. <tv> was the requested time value and - // <act> is the ACT passed when scheduling the timer + // <act> is the ACT passed when scheduling the timer. + + virtual void handle_wakeup (void); + // This is method works with the <run_event_loop> of the + // ACE_Proactor. A special <Wake_Up_Completion> is used to wake up + // all the threads that are blocking for completions. ACE_Proactor *proactor (void); // Get the proactor associated with this handler. diff --git a/ace/Object_Manager.cpp b/ace/Object_Manager.cpp index 7f2c89f859d..c6f5e15b430 100644 --- a/ace/Object_Manager.cpp +++ b/ace/Object_Manager.cpp @@ -213,6 +213,8 @@ ACE_Object_Manager::init (void) ACE_TOKEN_MANAGER_CREATION_LOCK) ACE_PREALLOCATE_OBJECT (ACE_TOKEN_CONST::MUTEX, ACE_TOKEN_INVARIANTS_CREATION_LOCK) + ACE_PREALLOCATE_OBJECT (ACE_Thread_Mutex, + ACE_PROACTOR_EVENT_LOOP_LOCK) # endif /* ACE_MT_SAFE */ } @@ -664,6 +666,8 @@ ACE_Object_Manager::fini (void) ACE_TOKEN_MANAGER_CREATION_LOCK) ACE_DELETE_PREALLOCATED_OBJECT (ACE_TOKEN_CONST::MUTEX, ACE_TOKEN_INVARIANTS_CREATION_LOCK) + ACE_DELETE_PREALLOCATED_OBJECT (ACE_Thread_Mutex, + ACE_PROACTOR_EVENT_LOOP_LOCK) # endif /* ACE_MT_SAFE */ #endif /* ! ACE_HAS_STATIC_PREALLOCATION */ diff --git a/ace/Object_Manager.h b/ace/Object_Manager.h index 1dfc2cb20cc..96bd39daf79 100644 --- a/ace/Object_Manager.h +++ b/ace/Object_Manager.h @@ -256,6 +256,7 @@ public: ACE_THREAD_EXIT_LOCK, ACE_TOKEN_MANAGER_CREATION_LOCK, ACE_TOKEN_INVARIANTS_CREATION_LOCK, + ACE_PROACTOR_EVENT_LOOP_LOCK, #endif /* ACE_MT_SAFE */ // Hook for preallocated objects provided by application. diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index 573fd72ac3c..6c3683ed4fb 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -78,6 +78,7 @@ ACE_POSIX_Asynch_Result::signal_number (void) const { return this->aio_sigevent.sigev_signo; } + int ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl) { @@ -2105,8 +2106,6 @@ ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void) void* ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) { - ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n")); - // Retrieve the reactor pointer from the argument. ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *, arg_reactor); @@ -2126,13 +2125,7 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) while (result != -1) { result = reactor->handle_events (); - ACE_DEBUG ((LM_DEBUG, - "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n", - result)); } - - ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n")); - return 0; } diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index bf84e9ebff2..68c8aaf7be0 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -13,6 +13,38 @@ #include "ace/POSIX_Proactor.i" #endif /* __ACE_INLINE__ */ +class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result +{ + // = TITLE + // + // This is result object is used by the <end_event_loop> of the + // ACE_Proactor interface to wake up all the threads blocking + // for completions. + // + // = DESCRIPTION + // + +public: + ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, + const void *act = 0, + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); + // Constructor. + + virtual ~ACE_POSIX_Wakeup_Completion (void); + // Destructor. + + + virtual void complete (u_long bytes_transferred = 0, + int success = 1, + const void *completion_key = 0, + u_long error = 0); + // This method calls the <handler>'s <handle_wakeup> method. +}; + +// ********************************************************************* + ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void) { this->close (); @@ -308,6 +340,23 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r } } +int +ACE_POSIX_Proactor::post_wakeup_completions (int how_many) +{ + ACE_POSIX_Wakeup_Completion *wakeup_completion = 0; + for (ssize_t ci = 0; ci < how_many; ci++) + { + ACE_NEW_RETURN (wakeup_completion, + ACE_POSIX_Wakeup_Completion (this->wakeup_handler_), + -1); + + if (wakeup_completion->post_completion (this) == -1) + return -1; + } + + return 0; +} + // ********************************************************************* class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler @@ -982,9 +1031,9 @@ ACE_POSIX_SIG_Proactor::null_handler (int signal_number, void * /* context */) { ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%s:Signal number %d\n" - "Mask all the RT signals for this thread", - "ACE_POSIX_SIG_Proactor::null_handler called", + "Error:(%P | %t):ACE_POSIX_SIG_Proactor::null_handler called," + "Signal number %d," + "Mask all the RT signals for this thread\n", signal_number)); } @@ -1156,4 +1205,29 @@ ACE_POSIX_Asynch_Timer::complete (u_long bytes_transferred, this->handler_.handle_time_out (this->time_, this->act ()); } +// ********************************************************************* + +ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, + const void *act, + ACE_HANDLE event, + int priority, + int signal_number) + : ACE_Asynch_Result_Impl (), + ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number) +{ +} + +ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void) +{ +} + +void +ACE_POSIX_Wakeup_Completion::complete (u_long /* bytes_transferred */, + int /* success */, + const void * /* completion_key */, + u_long /* error */) +{ + this->handler_.handle_wakeup (); +} + #endif /* ACE_HAS_AIO_CALLS */ diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h index 5b84bfcaa44..ee3b4af9c01 100644 --- a/ace/POSIX_Proactor.h +++ b/ace/POSIX_Proactor.h @@ -174,6 +174,16 @@ protected: // compared to <AST> that can be associated each asynchronous // operation. <completion_key> is implemented right now for the // POSIX Proators. + + virtual int post_wakeup_completions (int how_many); + // Post <how_many> completions to the completion port so that all + // threads can wake up. This is used in conjunction with the + // <run_event_loop>. + +protected: + ACE_Handler wakeup_handler_; + // Handler to handle the wakeups. This works in conjunction with the + // <ACE_Proactor::run_event_loop>. }; // Forward declarations. diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 82a8d62dc51..8801109ffef 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -34,6 +34,9 @@ int ACE_Proactor::delete_proactor_ = 0; // Terminate the eventloop. sig_atomic_t ACE_Proactor::end_event_loop_ = 0; +// Number of threads in the event loop. +sig_atomic_t ACE_Proactor::event_loop_thread_count_ = 0; + class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH> { // = TITLE @@ -339,20 +342,60 @@ ACE_Proactor::close_singleton (void) int ACE_Proactor::run_event_loop (void) { - ACE_TRACE ("ACE_Proactor::run_event_loop"); + int result = 0; - while (ACE_Proactor::end_event_loop_ == 0) + // Declaring the lock variable. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_Thread_Mutex *lock = + ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object + (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK); +#endif /* ACE_MT_SAFE */ + + // Early check. It is ok to do this without lock, since we care just + // whether it is zero or non-zero. + if (ACE_Proactor::end_event_loop_ != 0) + return 0; + + // First time you are in. Increment the thread count. + { + // Obtain the lock in the MT environments. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); +#endif /* ACE_MT_SAFE */ + + // Increment the thread count. + ACE_Proactor::event_loop_thread_count_ ++; + } + + // Run the event loop. + while (1) { - int result = ACE_Proactor::instance ()->handle_events (); + // Check the end loop flag. It is ok to do this without lock, + // since we care just whether it is zero or non-zero. + if (ACE_Proactor::end_event_loop_ != 0) + break; + + // <end_event_loop> is not set. Ready to do <handle_events>. + result = ACE_Proactor::instance ()->handle_events (); if (ACE_Service_Config::reconfig_occurred ()) ACE_Service_Config::reconfigure (); - + else if (result == -1) - return -1; + break; } + + // Leaving the event loop. Decrement the thread count. - return 0; + // Obtain the lock in the MT environments. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); +#endif /* ACE_MT_SAFE */ + + // Decrement the thread count. + ACE_Proactor::event_loop_thread_count_ --; + + return result; } // Handle events for -tv- time. handle_events updates -tv- to reflect @@ -362,29 +405,93 @@ ACE_Proactor::run_event_loop (ACE_Time_Value &tv) { ACE_TRACE ("ACE_Proactor::run_event_loop"); - while (ACE_Proactor::end_event_loop_ == 0 - && tv != ACE_Time_Value::zero) + int result = 0; + + // Declaring the lock variable. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_Thread_Mutex *lock = + ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object + (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK); +#endif /* ACE_MT_SAFE */ + + // Early check. It is ok to do this without lock, since we care just + // whether it is zero or non-zero. + if (ACE_Proactor::end_event_loop_ != 0 || + tv == ACE_Time_Value::zero) + return 0; + + // First time you are in. Increment the thread count. + { + // Obtain the lock in the MT environments. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); +#endif /* ACE_MT_SAFE */ + + // Increment the thread count. + ACE_Proactor::event_loop_thread_count_ ++; + } + + // Run the event loop. + while (1) { - int result = ACE_Proactor::instance ()->handle_events (tv); + // Check for end of loop. It is ok to do this without lock, + // since we care just whether it is zero or non-zero. + if (ACE_Proactor::end_event_loop_ != 0 || + tv == ACE_Time_Value::zero) + break; + + // <end_event_loop> is not set. Ready to do <handle_events>. + result = ACE_Proactor::instance ()->handle_events (tv); if (ACE_Service_Config::reconfig_occurred ()) ACE_Service_Config::reconfigure (); - + // An error has occurred. else if (result == -1) - return result; + break; } - return 0; + // Leaving the event loop. Decrement the thread count. + + // Obtain the lock in the MT environments. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); +#endif /* ACE_MT_SAFE */ + + // Decrement the thread count. + ACE_Proactor::event_loop_thread_count_ --; + + return result; } int ACE_Proactor::end_event_loop (void) { ACE_TRACE ("ACE_Proactor::end_event_loop"); + + // Obtain the lock, set the end flag and post the wakeup + // completions. + + // Obtain the lock in the MT environments. +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + ACE_Thread_Mutex *lock = + ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object + (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); +#endif /* ACE_MT_SAFE */ + + // Set the end flag. ACE_Proactor::end_event_loop_ = 1; - // ACE_Proactor::instance()->notify (); - return 0; + + // Number of completions to post. + int how_many = ACE_Proactor::event_loop_thread_count_; + + // Reset the thread count. + ACE_Proactor::event_loop_thread_count_ = 0; + + // Post completions to all the threads so that they will all wake + // up. + return ACE_Proactor::post_wakeup_completions (how_many); } int @@ -802,6 +909,12 @@ ACE_Proactor::create_asynch_timer (ACE_Handler &handler, signal_number); } +int +ACE_Proactor::post_wakeup_completions (int how_many) +{ + return ACE_Proactor::instance ()->implementation ()->post_wakeup_completions (how_many); +} + void ACE_Proactor::implementation (ACE_Proactor_Impl *implementation) { diff --git a/ace/Proactor_Impl.h b/ace/Proactor_Impl.h index edae3452e1a..16d5b65d08f 100644 --- a/ace/Proactor_Impl.h +++ b/ace/Proactor_Impl.h @@ -85,7 +85,7 @@ public: // methods. virtual ACE_Asynch_Read_Stream_Impl *create_asynch_read_stream (void) = 0; - // Create the correct implementation class for doing Asynch_Read_Stream. + // Create the correct implementation class for doing Asynch_Read_Stream. virtual ACE_Asynch_Write_Stream_Impl *create_asynch_write_stream (void) = 0; // Create the correct implementation class for doing Asynch_Write_Stream. @@ -186,7 +186,12 @@ public: int signal_number = 0) = 0; // Create the correct implementation object for the Timer // result. POSIX_SIG_Proactor will create a Timer object with a - // meaningful signal number, if you leave the signal number as 0. + // meaningful signal number, if you leave the signal number as 0. + + virtual int post_wakeup_completions (int how_many) = 0; + // Post <how_many> completions to the completion port so that all + // threads can wake up. This is used in conjunction with the + // <run_event_loop>. }; #endif /* (ACE_WIN32 && ACE_HAS_WINCE) || ACE_HAS_AIO_CALLS */ diff --git a/ace/WIN32_Proactor.cpp b/ace/WIN32_Proactor.cpp index 9e94e4699ec..fc87933c28b 100644 --- a/ace/WIN32_Proactor.cpp +++ b/ace/WIN32_Proactor.cpp @@ -16,6 +16,38 @@ #include "ace/WIN32_Proactor.i" #endif /* __ACE_INLINE__ */ +class ACE_Export ACE_WIN32_Wakeup_Completion : public ACE_WIN32_Asynch_Result +{ + // = TITLE + // + // This is result object is used by the <end_event_loop> of the + // ACE_Proactor interface to wake up all the threads blocking + // for completions. + // + // = DESCRIPTION + // + +public: + ACE_WIN32_Wakeup_Completion (ACE_Handler &handler, + const void *act = 0, + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); + // Constructor. + + virtual ~ACE_WIN32_Wakeup_Completion (void); + // Destructor. + + + virtual void complete (u_long bytes_transferred = 0, + int success = 1, + const void *completion_key = 0, + u_long error = 0); + // This method calls the <handler>'s <handle_wakeup> method. +}; + +// ********************************************************************* + ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads, int used_with_reactor_event_loop) : completion_port_ (0), @@ -469,6 +501,23 @@ ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result) } int +ACE_WIN32_Proactor::post_wakeup_completions (int how_many) +{ + ACE_WIN32_Wakeup_Completion *wakeup_completion = 0; + for (ssize_t ci = 0; ci < how_many; ci++) + { + ACE_NEW_RETURN (wakeup_completion, + ACE_WIN32_Wakeup_Completion (this->wakeup_handler_), + -1); + + if (wakeup_completion->post_completion (this) == -1) + return -1; + } + + return 0; +} + +int ACE_WIN32_Proactor::wake_up_dispatch_threads (void) { return 0; @@ -513,7 +562,7 @@ ACE_WIN32_Asynch_Timer::complete (u_long bytes_transferred, const void *completion_key, u_long error) { - ACE_UNUSED_ARG (error); + ACE_UNUSED_ARG (error); ACE_UNUSED_ARG (completion_key); ACE_UNUSED_ARG (success); ACE_UNUSED_ARG (bytes_transferred); @@ -521,4 +570,29 @@ ACE_WIN32_Asynch_Timer::complete (u_long bytes_transferred, this->handler_.handle_time_out (this->time_, this->act ()); } +// ********************************************************************* + +ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion (ACE_Handler &handler, + const void *act, + ACE_HANDLE event, + int priority, + int signal_number) + : ACE_Asynch_Result_Impl (), + ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number) +{ +} + +ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion (void) +{ +} + +void +ACE_WIN32_Wakeup_Completion::complete (u_long /* bytes_transferred */, + int /* success */, + const void * /* completion_key */, + u_long /* error */) +{ + this->handler_.handle_wakeup (); +} + #endif /* ACE_WIN32 */ diff --git a/ace/WIN32_Proactor.h b/ace/WIN32_Proactor.h index f46999dbf9e..cc7dec6e120 100644 --- a/ace/WIN32_Proactor.h +++ b/ace/WIN32_Proactor.h @@ -201,6 +201,10 @@ protected: // Protect against structured exceptions caused by user code when // dispatching handles. + virtual int post_wakeup_completions (int how_many); + // Post <how_many> completions to the completion port so that all + // threads can wake up. This is used in conjunction with the + // <run_event_loop>. ACE_HANDLE completion_port_; // Handle for the completion port. Unix doesnt have completion @@ -217,6 +221,10 @@ protected: int used_with_reactor_event_loop_; // Flag that indicates whether we are used in conjunction with // Reactor. + + ACE_Handler wakeup_handler_; + // Handler to handle the wakeups. This works in conjunction with the + // <ACE_Proactor::run_event_loop>. }; class ACE_Export ACE_WIN32_Asynch_Timer : public ACE_WIN32_Asynch_Result |