diff options
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r-- | ace/Proactor.cpp | 260 |
1 files changed, 64 insertions, 196 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 8801109ffef..6e16958df67 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -34,9 +34,6 @@ int ACE_Proactor::delete_proactor_ = 0; // Terminate the eventloop. sig_atomic_t ACE_Proactor::end_event_loop_ = 0; -// Number of threads in the event loop. -sig_atomic_t ACE_Proactor::event_loop_thread_count_ = 0; - class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH> { // = TITLE @@ -59,14 +56,9 @@ public: ACE_Proactor_Timer_Handler (ACE_Proactor &proactor); // Constructor. - virtual ~ACE_Proactor_Timer_Handler (void); + ~ACE_Proactor_Timer_Handler (void); // Destructor. - int destroy (void); - // Proactor calls this to shut down the timer handler - // gracefully. Just calling the destructor alone doesnt do what - // <destroy> does. <destroy> make sure the thread exits properly. - protected: virtual int svc (void); // Run by a daemon thread to handle deferred processing. In other @@ -97,9 +89,6 @@ ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void) // Signal timer event. this->timer_event_.signal (); - - // Wait for the Timer Handler thread to exit. - this->thr_mgr ()->wait (); } int @@ -107,28 +96,35 @@ ACE_Proactor_Timer_Handler::svc (void) { ACE_Time_Value absolute_time; int empty_flag = 0; - int result = 0; - + while (this->shutting_down_ == 0) { - // Is the timer queue empty? + // If the timer queue is not empty empty_flag = this->proactor_.timer_queue ()->is_empty (); - if (!empty_flag) { // Get the earliest absolute time. - absolute_time = this->proactor_.timer_queue ()->earliest_time (); - - // Block for absolute time. - result = this->timer_event_.wait (&absolute_time); + absolute_time = + this->proactor_.timer_queue ()->earliest_time () - + this->proactor_.timer_queue ()->gettimeofday (); +#if 0 + ACE_DEBUG ((LM_DEBUG, + "%N%l:(%t):Earliest Time %d sec, %d msec time\n", + absolute_time.sec (), + absolute_time.msec ())); +#endif + // Make it zero if it is negative. + if (absolute_time < ACE_Time_Value::zero) + absolute_time = ACE_Time_Value::zero; } + + // Wait for event upto <absolute_time>. + int result = 0; + if (empty_flag) + result = this->timer_event_.wait (0); else - { - // Wait for ever. - result = this->timer_event_.wait (); - } + result = this->timer_event_.wait (&absolute_time); - // Check for timer expiries. if (result == -1) { switch (errno) @@ -146,6 +142,7 @@ ACE_Proactor_Timer_Handler::svc (void) } } } + return 0; } @@ -169,21 +166,19 @@ ACE_Proactor_Handle_Timeout_Upcall::timeout (TIMER_QUEUE &timer_queue, ASYS_TEXT ("(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall,") ASYS_TEXT (" no completion port to post timeout to?!@\n")), -1); - + // Create the Asynch_Timer. ACE_Asynch_Result_Impl *asynch_timer = this->proactor_->create_asynch_timer (*handler, - act, - time, - ACE_INVALID_HANDLE, - 0, - -1); + act, + time, + 0); if (asynch_timer == 0) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t):%p\n", "ACE_Proactor_Handle_Timeout_Upcall::timeout:" "create_asynch_timer failed"), -1); - + // Post a completion. if (asynch_timer->post_completion (this->proactor_->implementation ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -244,7 +239,7 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation, delete_timer_queue_ (0) { this->implementation (implementation); - + if (this->implementation () == 0) { #if defined (ACE_HAS_AIO_CALLS) @@ -253,8 +248,8 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation, ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor); #elif defined (ACE_POSIX_SIG_PROACTOR) ACE_NEW (implementation, ACE_POSIX_SIG_Proactor); - #else /* Default is to use the SIG one */ - ACE_NEW (implementation, ACE_POSIX_SIG_Proactor); + #else /* Default is to use the AIOCB one */ + ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor); #endif #elif (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) // WIN_Proactor. @@ -284,7 +279,7 @@ ACE_Proactor::~ACE_Proactor (void) } ACE_Proactor * -ACE_Proactor::instance (size_t /* threads */) +ACE_Proactor::instance (size_t threads) { ACE_TRACE ("ACE_Proactor::instance"); @@ -342,60 +337,20 @@ ACE_Proactor::close_singleton (void) int ACE_Proactor::run_event_loop (void) { - int result = 0; - - // Declaring the lock variable. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_Thread_Mutex *lock = - ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object - (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK); -#endif /* ACE_MT_SAFE */ - - // Early check. It is ok to do this without lock, since we care just - // whether it is zero or non-zero. - if (ACE_Proactor::end_event_loop_ != 0) - return 0; - - // First time you are in. Increment the thread count. - { - // Obtain the lock in the MT environments. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); -#endif /* ACE_MT_SAFE */ - - // Increment the thread count. - ACE_Proactor::event_loop_thread_count_ ++; - } - - // Run the event loop. - while (1) + ACE_TRACE ("ACE_Proactor::run_event_loop"); + + while (ACE_Proactor::end_event_loop_ == 0) { - // Check the end loop flag. It is ok to do this without lock, - // since we care just whether it is zero or non-zero. - if (ACE_Proactor::end_event_loop_ != 0) - break; - - // <end_event_loop> is not set. Ready to do <handle_events>. - result = ACE_Proactor::instance ()->handle_events (); + int result = ACE_Proactor::instance ()->handle_events (); if (ACE_Service_Config::reconfig_occurred ()) ACE_Service_Config::reconfigure (); - + else if (result == -1) - break; + return -1; } - - // Leaving the event loop. Decrement the thread count. - - // Obtain the lock in the MT environments. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); -#endif /* ACE_MT_SAFE */ - - // Decrement the thread count. - ACE_Proactor::event_loop_thread_count_ --; - return result; + return 0; } // Handle events for -tv- time. handle_events updates -tv- to reflect @@ -405,93 +360,29 @@ ACE_Proactor::run_event_loop (ACE_Time_Value &tv) { ACE_TRACE ("ACE_Proactor::run_event_loop"); - int result = 0; - - // Declaring the lock variable. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_Thread_Mutex *lock = - ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object - (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK); -#endif /* ACE_MT_SAFE */ - - // Early check. It is ok to do this without lock, since we care just - // whether it is zero or non-zero. - if (ACE_Proactor::end_event_loop_ != 0 || - tv == ACE_Time_Value::zero) - return 0; - - // First time you are in. Increment the thread count. - { - // Obtain the lock in the MT environments. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); -#endif /* ACE_MT_SAFE */ - - // Increment the thread count. - ACE_Proactor::event_loop_thread_count_ ++; - } - - // Run the event loop. - while (1) + while (ACE_Proactor::end_event_loop_ == 0 + && tv != ACE_Time_Value::zero) { - // Check for end of loop. It is ok to do this without lock, - // since we care just whether it is zero or non-zero. - if (ACE_Proactor::end_event_loop_ != 0 || - tv == ACE_Time_Value::zero) - break; - - // <end_event_loop> is not set. Ready to do <handle_events>. - result = ACE_Proactor::instance ()->handle_events (tv); + int result = ACE_Proactor::instance ()->handle_events (tv); if (ACE_Service_Config::reconfig_occurred ()) ACE_Service_Config::reconfigure (); - + // An error has occurred. else if (result == -1) - break; + return result; } - // Leaving the event loop. Decrement the thread count. - - // Obtain the lock in the MT environments. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); -#endif /* ACE_MT_SAFE */ - - // Decrement the thread count. - ACE_Proactor::event_loop_thread_count_ --; - - return result; + return 0; } int ACE_Proactor::end_event_loop (void) { ACE_TRACE ("ACE_Proactor::end_event_loop"); - - // Obtain the lock, set the end flag and post the wakeup - // completions. - - // Obtain the lock in the MT environments. -#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) - ACE_Thread_Mutex *lock = - ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object - (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK); - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1); -#endif /* ACE_MT_SAFE */ - - // Set the end flag. ACE_Proactor::end_event_loop_ = 1; - - // Number of completions to post. - int how_many = ACE_Proactor::event_loop_thread_count_; - - // Reset the thread count. - ACE_Proactor::event_loop_thread_count_ = 0; - - // Post completions to all the threads so that they will all wake - // up. - return ACE_Proactor::post_wakeup_completions (how_many); + // ACE_Proactor::instance()->notify (); + return 0; } int @@ -517,15 +408,15 @@ ACE_Proactor::close (void) delete this->implementation (); this->implementation_ = 0; } - - // Delete the timer handler. + + // Take care of the timer handler if (this->timer_handler_) { delete this->timer_handler_; this->timer_handler_ = 0; } - // Delete the timer queue. + // Take care of the timer queue if (this->delete_timer_queue_) { delete this->timer_queue_; @@ -577,10 +468,7 @@ ACE_Proactor::schedule_timer (ACE_Handler &handler, this->timer_queue_->gettimeofday () + time; // Only one guy goes in here at a time - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, - ace_mon, - this->timer_queue_->mutex (), - -1); + ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, this->timer_queue_->mutex (), -1); // Schedule the timer long result = this->timer_queue_->schedule (&handler, @@ -749,8 +637,7 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler, u_long bytes_to_read, const void* act, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_read_stream_result (handler, handle, @@ -758,8 +645,7 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler, bytes_to_read, act, event, - priority, - signal_number); + priority); } @@ -770,8 +656,7 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, u_long bytes_to_write, const void* act, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_write_stream_result (handler, @@ -780,8 +665,7 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, bytes_to_write, act, event, - priority, - signal_number); + priority); } @@ -795,8 +679,7 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler, u_long offset, u_long offset_high, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_read_file_result (handler, @@ -807,8 +690,7 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler, offset, offset_high, event, - priority, - signal_number); + priority); } @@ -822,8 +704,7 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler, u_long offset, u_long offset_high, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_write_file_result (handler, @@ -834,8 +715,7 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler, offset, offset_high, event, - priority, - signal_number); + priority); } @@ -847,8 +727,7 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler, u_long bytes_to_read, const void* act, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_accept_result (handler, @@ -858,8 +737,7 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler, bytes_to_read, act, event, - priority, - signal_number); + priority); } ACE_Asynch_Transmit_File_Result_Impl * @@ -874,8 +752,7 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, u_long flags, const void *act, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_transmit_file_result (handler, @@ -889,8 +766,7 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, flags, act, event, - priority, - signal_number); + priority); } ACE_Asynch_Result_Impl * @@ -898,21 +774,13 @@ ACE_Proactor::create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, - int priority, - int signal_number) + int priority) { return this->implementation ()->create_asynch_timer (handler, act, tv, event, - priority, - signal_number); -} - -int -ACE_Proactor::post_wakeup_completions (int how_many) -{ - return ACE_Proactor::instance ()->implementation ()->post_wakeup_completions (how_many); + priority); } void |