summaryrefslogtreecommitdiff
path: root/ace/Proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Proactor.cpp')
-rw-r--r--ace/Proactor.cpp260
1 files changed, 64 insertions, 196 deletions
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 8801109ffef..6e16958df67 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -34,9 +34,6 @@ int ACE_Proactor::delete_proactor_ = 0;
// Terminate the eventloop.
sig_atomic_t ACE_Proactor::end_event_loop_ = 0;
-// Number of threads in the event loop.
-sig_atomic_t ACE_Proactor::event_loop_thread_count_ = 0;
-
class ACE_Export ACE_Proactor_Timer_Handler : public ACE_Task <ACE_NULL_SYNCH>
{
// = TITLE
@@ -59,14 +56,9 @@ public:
ACE_Proactor_Timer_Handler (ACE_Proactor &proactor);
// Constructor.
- virtual ~ACE_Proactor_Timer_Handler (void);
+ ~ACE_Proactor_Timer_Handler (void);
// Destructor.
- int destroy (void);
- // Proactor calls this to shut down the timer handler
- // gracefully. Just calling the destructor alone doesnt do what
- // <destroy> does. <destroy> make sure the thread exits properly.
-
protected:
virtual int svc (void);
// Run by a daemon thread to handle deferred processing. In other
@@ -97,9 +89,6 @@ ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void)
// Signal timer event.
this->timer_event_.signal ();
-
- // Wait for the Timer Handler thread to exit.
- this->thr_mgr ()->wait ();
}
int
@@ -107,28 +96,35 @@ ACE_Proactor_Timer_Handler::svc (void)
{
ACE_Time_Value absolute_time;
int empty_flag = 0;
- int result = 0;
-
+
while (this->shutting_down_ == 0)
{
- // Is the timer queue empty?
+ // If the timer queue is not empty
empty_flag = this->proactor_.timer_queue ()->is_empty ();
-
if (!empty_flag)
{
// Get the earliest absolute time.
- absolute_time = this->proactor_.timer_queue ()->earliest_time ();
-
- // Block for absolute time.
- result = this->timer_event_.wait (&absolute_time);
+ absolute_time =
+ this->proactor_.timer_queue ()->earliest_time () -
+ this->proactor_.timer_queue ()->gettimeofday ();
+#if 0
+ ACE_DEBUG ((LM_DEBUG,
+ "%N%l:(%t):Earliest Time %d sec, %d msec time\n",
+ absolute_time.sec (),
+ absolute_time.msec ()));
+#endif
+ // Make it zero if it is negative.
+ if (absolute_time < ACE_Time_Value::zero)
+ absolute_time = ACE_Time_Value::zero;
}
+
+ // Wait for event upto <absolute_time>.
+ int result = 0;
+ if (empty_flag)
+ result = this->timer_event_.wait (0);
else
- {
- // Wait for ever.
- result = this->timer_event_.wait ();
- }
+ result = this->timer_event_.wait (&absolute_time);
- // Check for timer expiries.
if (result == -1)
{
switch (errno)
@@ -146,6 +142,7 @@ ACE_Proactor_Timer_Handler::svc (void)
}
}
}
+
return 0;
}
@@ -169,21 +166,19 @@ ACE_Proactor_Handle_Timeout_Upcall::timeout (TIMER_QUEUE &timer_queue,
ASYS_TEXT ("(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall,")
ASYS_TEXT (" no completion port to post timeout to?!@\n")),
-1);
-
+
// Create the Asynch_Timer.
ACE_Asynch_Result_Impl *asynch_timer = this->proactor_->create_asynch_timer (*handler,
- act,
- time,
- ACE_INVALID_HANDLE,
- 0,
- -1);
+ act,
+ time,
+ 0);
if (asynch_timer == 0)
ACE_ERROR_RETURN ((LM_ERROR,
"%N:%l:(%P | %t):%p\n",
"ACE_Proactor_Handle_Timeout_Upcall::timeout:"
"create_asynch_timer failed"),
-1);
-
+
// Post a completion.
if (asynch_timer->post_completion (this->proactor_->implementation ()) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
@@ -244,7 +239,7 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
delete_timer_queue_ (0)
{
this->implementation (implementation);
-
+
if (this->implementation () == 0)
{
#if defined (ACE_HAS_AIO_CALLS)
@@ -253,8 +248,8 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation,
ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor);
#elif defined (ACE_POSIX_SIG_PROACTOR)
ACE_NEW (implementation, ACE_POSIX_SIG_Proactor);
- #else /* Default is to use the SIG one */
- ACE_NEW (implementation, ACE_POSIX_SIG_Proactor);
+ #else /* Default is to use the AIOCB one */
+ ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor);
#endif
#elif (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
// WIN_Proactor.
@@ -284,7 +279,7 @@ ACE_Proactor::~ACE_Proactor (void)
}
ACE_Proactor *
-ACE_Proactor::instance (size_t /* threads */)
+ACE_Proactor::instance (size_t threads)
{
ACE_TRACE ("ACE_Proactor::instance");
@@ -342,60 +337,20 @@ ACE_Proactor::close_singleton (void)
int
ACE_Proactor::run_event_loop (void)
{
- int result = 0;
-
- // Declaring the lock variable.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_Thread_Mutex *lock =
- ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object
- (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK);
-#endif /* ACE_MT_SAFE */
-
- // Early check. It is ok to do this without lock, since we care just
- // whether it is zero or non-zero.
- if (ACE_Proactor::end_event_loop_ != 0)
- return 0;
-
- // First time you are in. Increment the thread count.
- {
- // Obtain the lock in the MT environments.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
-#endif /* ACE_MT_SAFE */
-
- // Increment the thread count.
- ACE_Proactor::event_loop_thread_count_ ++;
- }
-
- // Run the event loop.
- while (1)
+ ACE_TRACE ("ACE_Proactor::run_event_loop");
+
+ while (ACE_Proactor::end_event_loop_ == 0)
{
- // Check the end loop flag. It is ok to do this without lock,
- // since we care just whether it is zero or non-zero.
- if (ACE_Proactor::end_event_loop_ != 0)
- break;
-
- // <end_event_loop> is not set. Ready to do <handle_events>.
- result = ACE_Proactor::instance ()->handle_events ();
+ int result = ACE_Proactor::instance ()->handle_events ();
if (ACE_Service_Config::reconfig_occurred ())
ACE_Service_Config::reconfigure ();
-
+
else if (result == -1)
- break;
+ return -1;
}
-
- // Leaving the event loop. Decrement the thread count.
-
- // Obtain the lock in the MT environments.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
-#endif /* ACE_MT_SAFE */
-
- // Decrement the thread count.
- ACE_Proactor::event_loop_thread_count_ --;
- return result;
+ return 0;
}
// Handle events for -tv- time. handle_events updates -tv- to reflect
@@ -405,93 +360,29 @@ ACE_Proactor::run_event_loop (ACE_Time_Value &tv)
{
ACE_TRACE ("ACE_Proactor::run_event_loop");
- int result = 0;
-
- // Declaring the lock variable.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_Thread_Mutex *lock =
- ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object
- (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK);
-#endif /* ACE_MT_SAFE */
-
- // Early check. It is ok to do this without lock, since we care just
- // whether it is zero or non-zero.
- if (ACE_Proactor::end_event_loop_ != 0 ||
- tv == ACE_Time_Value::zero)
- return 0;
-
- // First time you are in. Increment the thread count.
- {
- // Obtain the lock in the MT environments.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
-#endif /* ACE_MT_SAFE */
-
- // Increment the thread count.
- ACE_Proactor::event_loop_thread_count_ ++;
- }
-
- // Run the event loop.
- while (1)
+ while (ACE_Proactor::end_event_loop_ == 0
+ && tv != ACE_Time_Value::zero)
{
- // Check for end of loop. It is ok to do this without lock,
- // since we care just whether it is zero or non-zero.
- if (ACE_Proactor::end_event_loop_ != 0 ||
- tv == ACE_Time_Value::zero)
- break;
-
- // <end_event_loop> is not set. Ready to do <handle_events>.
- result = ACE_Proactor::instance ()->handle_events (tv);
+ int result = ACE_Proactor::instance ()->handle_events (tv);
if (ACE_Service_Config::reconfig_occurred ())
ACE_Service_Config::reconfigure ();
-
+
// An error has occurred.
else if (result == -1)
- break;
+ return result;
}
- // Leaving the event loop. Decrement the thread count.
-
- // Obtain the lock in the MT environments.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
-#endif /* ACE_MT_SAFE */
-
- // Decrement the thread count.
- ACE_Proactor::event_loop_thread_count_ --;
-
- return result;
+ return 0;
}
int
ACE_Proactor::end_event_loop (void)
{
ACE_TRACE ("ACE_Proactor::end_event_loop");
-
- // Obtain the lock, set the end flag and post the wakeup
- // completions.
-
- // Obtain the lock in the MT environments.
-#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
- ACE_Thread_Mutex *lock =
- ACE_Managed_Object<ACE_Thread_Mutex>::get_preallocated_object
- (ACE_Object_Manager::ACE_PROACTOR_EVENT_LOOP_LOCK);
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, *lock, -1);
-#endif /* ACE_MT_SAFE */
-
- // Set the end flag.
ACE_Proactor::end_event_loop_ = 1;
-
- // Number of completions to post.
- int how_many = ACE_Proactor::event_loop_thread_count_;
-
- // Reset the thread count.
- ACE_Proactor::event_loop_thread_count_ = 0;
-
- // Post completions to all the threads so that they will all wake
- // up.
- return ACE_Proactor::post_wakeup_completions (how_many);
+ // ACE_Proactor::instance()->notify ();
+ return 0;
}
int
@@ -517,15 +408,15 @@ ACE_Proactor::close (void)
delete this->implementation ();
this->implementation_ = 0;
}
-
- // Delete the timer handler.
+
+ // Take care of the timer handler
if (this->timer_handler_)
{
delete this->timer_handler_;
this->timer_handler_ = 0;
}
- // Delete the timer queue.
+ // Take care of the timer queue
if (this->delete_timer_queue_)
{
delete this->timer_queue_;
@@ -577,10 +468,7 @@ ACE_Proactor::schedule_timer (ACE_Handler &handler,
this->timer_queue_->gettimeofday () + time;
// Only one guy goes in here at a time
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
- ace_mon,
- this->timer_queue_->mutex (),
- -1);
+ ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, this->timer_queue_->mutex (), -1);
// Schedule the timer
long result = this->timer_queue_->schedule (&handler,
@@ -749,8 +637,7 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler,
u_long bytes_to_read,
const void* act,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_read_stream_result (handler,
handle,
@@ -758,8 +645,7 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler,
bytes_to_read,
act,
event,
- priority,
- signal_number);
+ priority);
}
@@ -770,8 +656,7 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
u_long bytes_to_write,
const void* act,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_write_stream_result (handler,
@@ -780,8 +665,7 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler,
bytes_to_write,
act,
event,
- priority,
- signal_number);
+ priority);
}
@@ -795,8 +679,7 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler,
u_long offset,
u_long offset_high,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_read_file_result (handler,
@@ -807,8 +690,7 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler,
offset,
offset_high,
event,
- priority,
- signal_number);
+ priority);
}
@@ -822,8 +704,7 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler,
u_long offset,
u_long offset_high,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_write_file_result (handler,
@@ -834,8 +715,7 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler,
offset,
offset_high,
event,
- priority,
- signal_number);
+ priority);
}
@@ -847,8 +727,7 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler,
u_long bytes_to_read,
const void* act,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_accept_result (handler,
@@ -858,8 +737,7 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler,
bytes_to_read,
act,
event,
- priority,
- signal_number);
+ priority);
}
ACE_Asynch_Transmit_File_Result_Impl *
@@ -874,8 +752,7 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
u_long flags,
const void *act,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_transmit_file_result (handler,
@@ -889,8 +766,7 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
flags,
act,
event,
- priority,
- signal_number);
+ priority);
}
ACE_Asynch_Result_Impl *
@@ -898,21 +774,13 @@ ACE_Proactor::create_asynch_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &tv,
ACE_HANDLE event,
- int priority,
- int signal_number)
+ int priority)
{
return this->implementation ()->create_asynch_timer (handler,
act,
tv,
event,
- priority,
- signal_number);
-}
-
-int
-ACE_Proactor::post_wakeup_completions (int how_many)
-{
- return ACE_Proactor::instance ()->implementation ()->post_wakeup_completions (how_many);
+ priority);
}
void