diff options
Diffstat (limited to 'ACE/ace/Select_Reactor_T.cpp')
-rw-r--r-- | ACE/ace/Select_Reactor_T.cpp | 1600 |
1 files changed, 1600 insertions, 0 deletions
diff --git a/ACE/ace/Select_Reactor_T.cpp b/ACE/ace/Select_Reactor_T.cpp new file mode 100644 index 00000000000..1dd2fc26b2a --- /dev/null +++ b/ACE/ace/Select_Reactor_T.cpp @@ -0,0 +1,1600 @@ +// $Id$ + +#ifndef ACE_SELECT_REACTOR_T_CPP +#define ACE_SELECT_REACTOR_T_CPP + +#include "ace/Select_Reactor_T.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/ACE.h" +#include "ace/Guard_T.h" +#include "ace/Log_Msg.h" +#include "ace/Signal.h" +#include "ace/Sig_Handler.h" +#include "ace/Thread.h" +#include "ace/Timer_Heap.h" +#include "ace/OS_NS_errno.h" +#include "ace/OS_NS_sys_select.h" +#include "ace/OS_NS_sys_stat.h" + +// For timer_queue_ +#include "ace/Recursive_Thread_Mutex.h" + +/* + * ACE Reactor specialization hook. + */ +//@@ REACTOR_SPL_INCLUDE_FORWARD_DECL_ADD_HOOK + +#if !defined (__ACE_INLINE__) +#include "ace/Select_Reactor_T.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID (ace, + Select_Reactor_T, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_T) + +#if defined (ACE_WIN32) +#define ACE_SELECT_REACTOR_HANDLE(H) (this->event_handlers_[(H)].handle_) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)].event_handler_) +#else +#define ACE_SELECT_REACTOR_HANDLE(H) (H) +#define ACE_SELECT_REACTOR_EVENT_HANDLER(THIS,H) ((THIS)->event_handlers_[(H)]) +#endif /* ACE_WIN32 */ + + template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::any_ready + (ACE_Select_Reactor_Handle_Set &wait_set) +{ + ACE_TRACE ("ACE_Select_Reactor_T::any_ready"); + + if (this->mask_signals_) + { +#if !defined (ACE_WIN32) + // Make this call signal safe. + ACE_Sig_Guard sb; +#endif /* ACE_WIN32 */ + + return this->any_ready_i (wait_set); + } + return this->any_ready_i (wait_set); +} + + template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::any_ready_i + (ACE_Select_Reactor_Handle_Set &wait_set) +{ + ACE_TRACE ("ACE_Select_Reactor_T::any_ready_i"); + + int number_ready = this->ready_set_.rd_mask_.num_set () + + this->ready_set_.wr_mask_.num_set () + + this->ready_set_.ex_mask_.num_set (); + + // number_ready > 0 meaning there are handles in the ready_set + // &wait_set != &(this->ready_set_) means that we need to copy + // the handles from the ready_set to the wait set because the + // wait_set_ doesn't contain all the handles in the ready_set_ + if (number_ready > 0 && &wait_set != &(this->ready_set_)) + { + wait_set.rd_mask_ = this->ready_set_.rd_mask_; + wait_set.wr_mask_ = this->ready_set_.wr_mask_; + wait_set.ex_mask_ = this->ready_set_.ex_mask_; + + this->ready_set_.rd_mask_.reset (); + this->ready_set_.wr_mask_.reset (); + this->ready_set_.ex_mask_.reset (); + } + + return number_ready; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler_i (int signum, + ACE_Event_Handler **eh) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler_i"); + ACE_Event_Handler *handler = this->signal_handler_->handler (signum); + + if (handler == 0) + return -1; + else if (eh != 0) + *eh = handler; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::initialized (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::initialized"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, 0)); + return this->initialized_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t tid, + ACE_thread_t *o_id) +{ + ACE_TRACE ("ACE_Select_Reactor_T::owner"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (o_id) + *o_id = this->owner_; + + this->owner_ = tid; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::owner (ACE_thread_t *t_id) +{ + ACE_TRACE ("ACE_Select_Reactor_T::owner"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + *t_id = this->owner_; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::restart (void) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->restart_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::restart (int r) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + int current_value = this->restart_; + this->restart_ = r; + return current_value; +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::requeue_position (int rp) +{ + ACE_TRACE ("ACE_Select_Reactor_T::requeue_position"); + ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_)); +#if defined (ACE_WIN32) + ACE_UNUSED_ARG (rp); + // Must always requeue ourselves "next" on Win32. + this->requeue_position_ = 0; +#else + this->requeue_position_ = rp; +#endif /* ACE_WIN32 */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::requeue_position (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::requeue_position"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->requeue_position_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::max_notify_iterations (int iterations) +{ + ACE_TRACE ("ACE_Select_Reactor_T::max_notify_iterations"); + ACE_MT (ACE_GUARD (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_)); + + this->notify_handler_->max_notify_iterations (iterations); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::max_notify_iterations (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::max_notify_iterations"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->notify_handler_->max_notify_iterations (); +} + +// Enqueue ourselves into the list of waiting threads. +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::renew (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::renew"); +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + if (this->supress_notify_renew () == 0) + this->token_.renew (this->requeue_position_); +#endif /* defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Select_Reactor_T::notify"); + + ssize_t n = 0; + + // Pass over both the Event_Handler *and* the mask to allow the + // caller to dictate which Event_Handler method the receiver + // invokes. Note that this call can timeout. + + n = this->notify_handler_->notify (eh, mask, timeout); + return n == -1 ? -1 : 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handler (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->resume_i (handle); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handler (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->suspend_i (handle); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_handlers (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_handlers"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); + iter.next (eh) != 0; + iter.advance ()) + this->suspend_i (eh->get_handle ()); + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_handlers (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_handlers"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + ACE_Event_Handler *eh = 0; + + for (ACE_Select_Reactor_Handler_Repository_Iterator iter (&this->handler_rep_); + iter.next (eh) != 0; + iter.advance ()) + this->resume_i (eh->get_handle ()); + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler + (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->register_handler_i (handler->get_handle (), handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler + (ACE_HANDLE handle, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->register_handler_i (handle, handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler + (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->register_handler_i (handles, handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_Event_Handler * +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::find_handler + (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::find_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, 0)); + return this->find_handler_i (handle); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler + (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Event_Handler **handler) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->handler_i (handle, mask, handler); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler + (const ACE_Handle_Set &handles, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->remove_handler_i (handles, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler + (ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->remove_handler_i (handler->get_handle (), mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler + (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->remove_handler_i (handle, mask); +} + +// Performs operations on the "ready" bits. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ready_ops + (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ready_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + return this->bit_ops (handle, + mask, + this->ready_set_, + ops); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::open + (size_t size, + int restart, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int disable_notify_pipe, + ACE_Reactor_Notify *notify) +{ + ACE_TRACE ("ACE_Select_Reactor_T::open"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + // Can't initialize ourselves more than once. + if (this->initialized_) + return -1; + + this->owner_ = ACE_Thread::self (); + this->restart_ = restart; + this->signal_handler_ = sh; + this->timer_queue_ = tq; + this->notify_handler_ = notify; + + int result = 0; + + // Allows the signal handler to be overridden. + if (this->signal_handler_ == 0) + { + ACE_NEW_RETURN (this->signal_handler_, + ACE_Sig_Handler, + -1); + + if (this->signal_handler_ == 0) + result = -1; + else + this->delete_signal_handler_ = 1; + } + + // Allows the timer queue to be overridden. + if (result != -1 && this->timer_queue_ == 0) + { + ACE_NEW_RETURN (this->timer_queue_, + ACE_Timer_Heap, + -1); + + if (this->timer_queue_ == 0) + result = -1; + else + this->delete_timer_queue_ = 1; + } + + // Allows the Notify_Handler to be overridden. + if (result != -1 && this->notify_handler_ == 0) + { + ACE_NEW_RETURN (this->notify_handler_, + ACE_Select_Reactor_Notify, + -1); + + if (this->notify_handler_ == 0) + result = -1; + else + this->delete_notify_handler_ = 1; + } + + if (result != -1 && this->handler_rep_.open (size) == -1) + result = -1; + else if (this->notify_handler_->open (this, + 0, + disable_notify_pipe) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("notification pipe open failed"))); + result = -1; + } + + if (result != -1) + // We're all set to go. + this->initialized_ = 1; + else + // This will close down all the allocated resources properly. + this->close (); + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::set_sig_handler + (ACE_Sig_Handler *signal_handler) +{ + if (this->signal_handler_ != 0 && this->delete_signal_handler_ != 0) + delete this->signal_handler_; + this->signal_handler_ = signal_handler; + this->delete_signal_handler_ = 0; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_Timer_Queue * +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::timer_queue (void) const +{ + return this->timer_queue_; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::timer_queue + (ACE_Timer_Queue *tq) +{ + if (this->timer_queue_ != 0 && this->delete_timer_queue_ != 0) + delete this->timer_queue_; + this->timer_queue_ = tq; + this->delete_timer_queue_ = 0; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ACE_Select_Reactor_T + (ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int disable_notify_pipe, + ACE_Reactor_Notify *notify, + int mask_signals, + int s_queue) + : ACE_Select_Reactor_Impl (mask_signals) + , token_ (*this, s_queue) + , lock_adapter_ (token_) + , deactivated_ (0) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ACE_Select_Reactor_T"); + + // First try to open the Reactor with the hard-coded default. + if (this->open (ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::DEFAULT_SIZE, + 0, + sh, + tq, + disable_notify_pipe, + notify) == -1) + { + // The hard-coded default Reactor size failed, so attempt to + // determine the size at run-time by checking the process file + // descriptor limit on platforms that support this feature. + + // There is no need to deallocate resources from previous open() + // call since the open() method deallocates any resources prior + // to exiting if an error was encountered. + + // Set the default reactor size to be the current limit on the + // number of file descriptors available to the process. This + // size is not necessarily the maximum limit. + if (this->open (ACE::max_handles (), + 0, + sh, + tq, + disable_notify_pipe, + notify) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Select_Reactor_T::open ") + ACE_LIB_TEXT ("failed inside ") + ACE_LIB_TEXT ("ACE_Select_Reactor_T::CTOR"))); + } +} + +// Initialize ACE_Select_Reactor_T. + +template <class ACE_SELECT_REACTOR_TOKEN> +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::ACE_Select_Reactor_T + (size_t size, + int rs, + ACE_Sig_Handler *sh, + ACE_Timer_Queue *tq, + int disable_notify_pipe, + ACE_Reactor_Notify *notify, + int mask_signals, + int s_queue) + : ACE_Select_Reactor_Impl (mask_signals) + , token_ (*this, s_queue) + , lock_adapter_ (token_) + , deactivated_ (0) +{ + ACE_TRACE ("ACE_Select_Reactor_T::ACE_Select_Reactor_T"); + + if (this->open (size, + rs, + sh, + tq, + disable_notify_pipe, + notify) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Select_Reactor_T::open ") + ACE_LIB_TEXT ("failed inside ACE_Select_Reactor_T::CTOR"))); +} + +// Close down the ACE_Select_Reactor_T instance, detaching any +// remaining Event_Handers. This had better be called from the main +// event loop thread... + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::close (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::close"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (this->delete_signal_handler_) + { + delete this->signal_handler_; + this->signal_handler_ = 0; + this->delete_signal_handler_ = 0; + } + + this->handler_rep_.close (); + + if (this->delete_timer_queue_) + { + delete this->timer_queue_; + this->timer_queue_ = 0; + this->delete_timer_queue_ = 0; + } + + if (this->notify_handler_ != 0) + this->notify_handler_->close (); + + if (this->delete_notify_handler_) + { + delete this->notify_handler_; + this->notify_handler_ = 0; + this->delete_notify_handler_ = 0; + } + + this->initialized_ = 0; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::current_info + (ACE_HANDLE, size_t &) +{ + return -1; +} + +template <class ACE_SELECT_REACTOR_TOKEN> +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::~ACE_Select_Reactor_T (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::~ACE_Select_Reactor_T"); + this->close (); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i + (const ACE_Handle_Set &handles, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); + ACE_HANDLE h; + + ACE_Handle_Set_Iterator handle_iter (handles); + + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->remove_handler_i (h, mask) == -1) + return -1; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i + (const ACE_Handle_Set &handles, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); + ACE_HANDLE h; + + ACE_Handle_Set_Iterator handle_iter (handles); + while ((h = handle_iter ()) != ACE_INVALID_HANDLE) + if (this->register_handler_i (h, handler, mask) == -1) + return -1; + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler + (const ACE_Sig_Set &sigset, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler"); + + int result = 0; + +#if (ACE_NSIG > 0) + for (int s = 1; s < ACE_NSIG; ++s) + if ((sigset.is_member (s) == 1) + && this->signal_handler_->register_handler (s, + new_sh, + new_disp) == -1) + result = -1; +#else /* ACE_NSIG <= 0 */ + ACE_UNUSED_ARG (sigset); + ACE_UNUSED_ARG (new_sh); + ACE_UNUSED_ARG (new_disp); +#endif /* ACE_NSIG <= 0 */ + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler + (const ACE_Sig_Set &sigset) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler"); + int result = 0; + +#if (ACE_NSIG > 0) + for (int s = 1; s < ACE_NSIG; ++s) + if ((sigset.is_member (s) == 1) + && this->signal_handler_->remove_handler (s) == -1) + result = -1; +#else /* ACE_NSIG <= 0 */ + ACE_UNUSED_ARG (sigset); +#endif /* ACE_NSIG <= 0 */ + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_timer (ACE_Event_Handler *handler, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (this->timer_queue_ != 0) + return this->timer_queue_->cancel (handler, dont_call_handle_close); + else + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::cancel_timer (long timer_id, + const void **arg, + int dont_call_handle_close) +{ + ACE_TRACE ("ACE_Select_Reactor_T::cancel_timer"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (this->timer_queue_ != 0) + return this->timer_queue_->cancel (timer_id, + arg, + dont_call_handle_close); + else + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> long +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::schedule_timer + (ACE_Event_Handler *handler, + const void *arg, + const ACE_Time_Value &delay_time, + const ACE_Time_Value &interval) +{ + ACE_TRACE ("ACE_Select_Reactor_T::schedule_timer"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (0 != this->timer_queue_) + return this->timer_queue_->schedule + (handler, + arg, + timer_queue_->gettimeofday () + delay_time, + interval); + + errno = ESHUTDOWN; + return -1; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::reset_timer_interval + (long timer_id, + const ACE_Time_Value &interval) +{ + ACE_TRACE ("ACE_Select_Reactor_T::reset_timer_interval"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + if (0 != this->timer_queue_) + return this->timer_queue_->reset_interval (timer_id, interval); + + errno = ESHUTDOWN; + return -1; +} + +// Main event loop driver that blocks for <max_wait_time> before +// returning (will return earlier if I/O or signal events occur). + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events + (ACE_Time_Value &max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); + + return this->handle_events (&max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_error (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handle_error"); +#if defined (linux) && defined (ERESTARTNOHAND) + if (errno == EINTR || errno == ERESTARTNOHAND) + return this->restart_; +#else + if (errno == EINTR) + return this->restart_; +#endif /* linux && ERESTARTNOHAND */ +#if defined (__MVS__) || defined (ACE_WIN32) || defined (ACE_VXWORKS) + // On MVS Open Edition and Win32, there can be a number of failure + // codes on a bad socket, so check_handles on anything other than + // EINTR. VxWorks doesn't even bother to always set errno on error + // in select (specifically, it doesn't return EBADF for bad FDs). + else + return this->check_handles (); +#else + else if (errno == EBADF) + return this->check_handles (); + else + return -1; +#endif /* __MVS__ || ACE_WIN32 */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::notify_handle + (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Handle_Set &ready_mask, + ACE_Event_Handler *event_handler, + ACE_EH_PTMF ptmf) +{ + ACE_TRACE ("ACE_Select_Reactor_T::notify_handle"); + // Check for removed handlers. + if (event_handler == 0) + return; + + int reference_counting_required = + event_handler->reference_counting_policy ().value () == + ACE_Event_Handler::Reference_Counting_Policy::ENABLED; + + // Call add_reference() if needed. + if (reference_counting_required) + { + event_handler->add_reference (); + } + + int status = (event_handler->*ptmf) (handle); + + if (status < 0) + this->remove_handler_i (handle, mask); + else if (status > 0) + ready_mask.set_bit (handle); + + // Call remove_reference() if needed. + if (reference_counting_required) + { + event_handler->remove_reference (); + } +} + +// Perform GET, CLR, SET, and ADD operations on the select() +// Handle_Sets. +// +// GET = 1, Retrieve current value +// SET = 2, Set value of bits to new mask (changes the entire mask) +// ADD = 3, Bitwise "or" the value into the mask (only changes +// enabled bits) +// CLR = 4 Bitwise "and" the negation of the value out of the mask +// (only changes enabled bits) +// +// Returns the original mask. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::mask_ops + (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + int ops) +{ + ACE_TRACE ("ACE_Select_Reactor_T::mask_ops"); + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1)); + + // If the handle is not suspended, then set the ops on the + // <wait_set_>, otherwise set the <suspend_set_>. + + if (this->is_suspended_i (handle)) + return this->bit_ops (handle, mask, + this->suspend_set_, + ops); + else + return this->bit_ops (handle, mask, + this->wait_set_, + ops); +} + +template <class ACE_SELECT_REACTOR_TOKEN> ACE_Event_Handler * +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::find_handler_i + (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::find_handler_i"); + + ACE_Event_Handler *event_handler = + this->handler_rep_.find (handle); + + if (event_handler) + event_handler->add_reference (); + + return event_handler; +} + +// Must be called with locks held. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handler_i + (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Event_Handler **eh) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handler_i"); + ACE_Event_Handler *event_handler = + this->handler_rep_.find (handle); + + if (event_handler == 0) + return -1; + else + { + if ((ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) + || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)) + && this->wait_set_.rd_mask_.is_set (handle) == 0) + return -1; + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK) + && this->wait_set_.wr_mask_.is_set (handle) == 0) + return -1; + if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK) + && this->wait_set_.ex_mask_.is_set (handle) == 0) + return -1; + } + + if (eh != 0) + { + *eh = event_handler; + event_handler->add_reference (); + } + + return 0; +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::resume_i (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::resume_i"); + if (this->handler_rep_.find (handle) == 0) + return -1; + + if (this->suspend_set_.rd_mask_.is_set (handle)) + { + this->wait_set_.rd_mask_.set_bit (handle); + this->suspend_set_.rd_mask_.clr_bit (handle); + } + if (this->suspend_set_.wr_mask_.is_set (handle)) + { + this->wait_set_.wr_mask_.set_bit (handle); + this->suspend_set_.wr_mask_.clr_bit (handle); + } + if (this->suspend_set_.ex_mask_.is_set (handle)) + { + this->wait_set_.ex_mask_.set_bit (handle); + this->suspend_set_.ex_mask_.clr_bit (handle); + } + return 0; +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::suspend_i (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::suspend_i"); + if (this->handler_rep_.find (handle) == 0) + return -1; + + if (this->wait_set_.rd_mask_.is_set (handle)) + { + this->suspend_set_.rd_mask_.set_bit (handle); + this->wait_set_.rd_mask_.clr_bit (handle); + } + if (this->wait_set_.wr_mask_.is_set (handle)) + { + this->suspend_set_.wr_mask_.set_bit (handle); + this->wait_set_.wr_mask_.clr_bit (handle); + } + if (this->wait_set_.ex_mask_.is_set (handle)) + { + this->suspend_set_.ex_mask_.set_bit (handle); + this->wait_set_.ex_mask_.clr_bit (handle); + } + + // Kobi: we need to remove that handle from the + // dispatch set as well. We use that function with all the relevant + // masks - rd/wr/ex - all the mask. it is completely suspended + this->clear_dispatch_mask (handle, ACE_Event_Handler::RWE_MASK); + return 0; +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::is_suspended_i (ACE_HANDLE handle) +{ + ACE_TRACE ("ACE_Select_Reactor_T::is_suspended_i"); + if (this->handler_rep_.find (handle) == 0) + return 0; + + return this->suspend_set_.rd_mask_.is_set (handle) || + this->suspend_set_.wr_mask_.is_set (handle) || + this->suspend_set_.ex_mask_.is_set (handle) ; + +} + +// Must be called with locks held + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::register_handler_i + (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::register_handler_i"); + + // Insert the <handle, event_handle> tuple into the Handler + // Repository. + return this->handler_rep_.bind (handle, event_handler, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::remove_handler_i + (ACE_HANDLE handle, + ACE_Reactor_Mask mask) +{ + ACE_TRACE ("ACE_Select_Reactor_T::remove_handler_i"); + + // Unbind this handle. + return this->handler_rep_.unbind (handle, mask); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::work_pending + (const ACE_Time_Value &max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::work_pending"); + + ACE_Time_Value mwt (max_wait_time); + ACE_MT (ACE_Countdown_Time countdown (&mwt)); + + ACE_MT (ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, + ace_mon, + this->token_, + -1)); + + if (this->deactivated_) + return 0; + + // Update the countdown to reflect time waiting for the mutex. + ACE_MT (countdown.update ()); + + ACE_Time_Value timer_buf (0); + ACE_Time_Value *this_timeout = + this->timer_queue_->calculate_timeout (&mwt, &timer_buf); + + // Check if we have timers to fire. + int timers_pending = + (this_timeout != 0 && *this_timeout != mwt ? 1 : 0); + + u_long width = (u_long) this->handler_rep_.max_handlep1 (); + + ACE_Select_Reactor_Handle_Set fd_set; + fd_set.rd_mask_ = this->wait_set_.rd_mask_; + fd_set.wr_mask_ = this->wait_set_.wr_mask_; + fd_set.ex_mask_ = this->wait_set_.ex_mask_; + + int nfds = ACE_OS::select (int (width), + fd_set.rd_mask_, + fd_set.wr_mask_, + fd_set.ex_mask_, + this_timeout); + + // If timers are pending, override any timeout from the select() + // call. + return (nfds == 0 && timers_pending != 0 ? 1 : nfds); +} + +// Must be called with lock held. + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::wait_for_multiple_events + (ACE_Select_Reactor_Handle_Set &dispatch_set, + ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events"); + u_long width = 0; + ACE_Time_Value timer_buf (0); + ACE_Time_Value *this_timeout; + + int number_of_active_handles = this->any_ready (dispatch_set); + + // If there are any bits enabled in the <ready_set_> then we'll + // handle those first, otherwise we'll block in <select>. + + if (number_of_active_handles == 0) + { + do + { + this_timeout = + this->timer_queue_->calculate_timeout (max_wait_time, + &timer_buf); + width = (u_long) this->handler_rep_.max_handlep1 (); + + dispatch_set.rd_mask_ = this->wait_set_.rd_mask_; + dispatch_set.wr_mask_ = this->wait_set_.wr_mask_; + dispatch_set.ex_mask_ = this->wait_set_.ex_mask_; + number_of_active_handles = ACE_OS::select (int (width), + dispatch_set.rd_mask_, + dispatch_set.wr_mask_, + dispatch_set.ex_mask_, + this_timeout); + } + while (number_of_active_handles == -1 && this->handle_error () > 0); + + if (number_of_active_handles > 0) + { +#if !defined (ACE_WIN32) + // Resynchronize the fd_sets so their "max" is set properly. + dispatch_set.rd_mask_.sync (this->handler_rep_.max_handlep1 ()); + dispatch_set.wr_mask_.sync (this->handler_rep_.max_handlep1 ()); + dispatch_set.ex_mask_.sync (this->handler_rep_.max_handlep1 ()); +#endif /* ACE_WIN32 */ + } + else if (number_of_active_handles == -1) + { + // Normally, select() will reset the bits in dispatch_set + // so that only those filed descriptors that are ready will + // have bits set. However, when an error occurs, the bit + // set remains as it was when the select call was first made. + // Thus, we now have a dispatch_set that has every file + // descriptor that was originally waited for, which is not + // correct. We must clear all the bit sets because we + // have no idea if any of the file descriptors is ready. + // + // NOTE: We dont have a test case to reproduce this + // problem. But pleae dont ignore this and remove it off. + dispatch_set.rd_mask_.reset (); + dispatch_set.wr_mask_.reset (); + dispatch_set.ex_mask_.reset (); + } + } + + // Return the number of events to dispatch. + return number_of_active_handles; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_timer_handlers + (int &number_of_handlers_dispatched) +{ + number_of_handlers_dispatched += this->timer_queue_->expire (); + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_notification_handlers + (ACE_Select_Reactor_Handle_Set &dispatch_set, + int &number_of_active_handles, + int &number_of_handlers_dispatched) +{ + // Check to see if the ACE_HANDLE associated with the + // Select_Reactor's notify hook is enabled. If so, it means that + // one or more other threads are trying to update the + // ACE_Select_Reactor_T's internal tables or the notify pipe is + // enabled. We'll handle all these threads and notifications, and + // then break out to continue the event loop. + int n = + this->notify_handler_->dispatch_notifications (number_of_active_handles, + dispatch_set.rd_mask_); + + if (n == -1) + return -1; + else + { + number_of_handlers_dispatched += n; + number_of_active_handles -= n; + } + + // Same as dispatch_timer_handlers + // No need to do anything with the state changed. That is because + // unbind already handles the case where someone unregister some + // kind of handle and unbind it. (::unbind calls the function + // state_changed () to reflect ant change with that) + // return this->state_changed_ ? -1 : 0; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_set + (int number_of_active_handles, + int &number_of_handlers_dispatched, + int mask, + ACE_Handle_Set &dispatch_mask, + ACE_Handle_Set &ready_mask, + ACE_EH_PTMF callback) +{ + ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_set"); + ACE_HANDLE handle; + + ACE_Handle_Set_Iterator handle_iter (dispatch_mask); + + while ((handle = handle_iter ()) != ACE_INVALID_HANDLE && + number_of_handlers_dispatched < number_of_active_handles) + { + ++number_of_handlers_dispatched; + + this->notify_handle (handle, + mask, + ready_mask, + this->handler_rep_.find (handle), + callback); + + // clear the bit from that dispatch mask, + // so when we need to restart the iteration (rebuilding the iterator...) + // we will not dispatch the already dispatched handlers + this->clear_dispatch_mask (handle, mask); + + if (this->state_changed_) + { + + handle_iter.reset_state (); + this->state_changed_ = false; + } + } + + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_io_handlers + (ACE_Select_Reactor_Handle_Set &dispatch_set, + int &number_of_active_handles, + int &number_of_handlers_dispatched) +{ + ACE_TRACE ("ACE_Select_Reactor_T::dispatch_io_handlers"); + + // Handle output events (this code needs to come first to handle the + // obscure case of piggy-backed data coming along with the final + // handshake message of a nonblocking connection). + + if (this->dispatch_io_set (number_of_active_handles, + number_of_handlers_dispatched, + ACE_Event_Handler::WRITE_MASK, + dispatch_set.wr_mask_, + this->ready_set_.wr_mask_, + &ACE_Event_Handler::handle_output) == -1) + { + number_of_active_handles -= number_of_handlers_dispatched; + return -1; + } + + // ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Select_Reactor_T::dispatch - EXCEPT\n"))); + if (this->dispatch_io_set (number_of_active_handles, + number_of_handlers_dispatched, + ACE_Event_Handler::EXCEPT_MASK, + dispatch_set.ex_mask_, + this->ready_set_.ex_mask_, + &ACE_Event_Handler::handle_exception) == -1) + { + number_of_active_handles -= number_of_handlers_dispatched; + return -1; + } + + // ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Select_Reactor_T::dispatch - READ\n"))); + if (this->dispatch_io_set (number_of_active_handles, + number_of_handlers_dispatched, + ACE_Event_Handler::READ_MASK, + dispatch_set.rd_mask_, + this->ready_set_.rd_mask_, + &ACE_Event_Handler::handle_input) == -1) + { + number_of_active_handles -= number_of_handlers_dispatched; + return -1; + } + + number_of_active_handles -= number_of_handlers_dispatched; + return 0; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch + (int active_handle_count, + ACE_Select_Reactor_Handle_Set &dispatch_set) +{ + ACE_TRACE ("ACE_Select_Reactor_T::dispatch"); + + int io_handlers_dispatched = 0; + int other_handlers_dispatched = 0; + int signal_occurred = 0; + // The following do/while loop keeps dispatching as long as there + // are still active handles. Note that the only way we should ever + // iterate more than once through this loop is if signals occur + // while we're dispatching other handlers. + + do + { + // Note that we keep track of changes to our state. If any of + // the dispatch_*() methods below return -1 it means that the + // <wait_set_> state has changed as the result of an + // <ACE_Event_Handler> being dispatched. This means that we + // need to bail out and rerun the select() loop since our + // existing notion of handles in <dispatch_set> may no longer be + // correct. + // + // In the beginning, our state starts out unchanged. After + // every iteration (i.e., due to signals), our state starts out + // unchanged again. + + this->state_changed_ = false; + + // Perform the Template Method for dispatching all the handlers. + + // First check for interrupts. + if (active_handle_count == -1) + { + // Bail out -- we got here since <select> was interrupted. + if (ACE_Sig_Handler::sig_pending () != 0) + { + ACE_Sig_Handler::sig_pending (0); + + // If any HANDLES in the <ready_set_> are activated as a + // result of signals they should be dispatched since + // they may be time critical... + active_handle_count = this->any_ready (dispatch_set); + + // Record the fact that the Reactor has dispatched a + // handle_signal() method. We need this to return the + // appropriate count below. + signal_occurred = 1; + } + else + return -1; + } + + // Handle timers early since they may have higher latency + // constraints than I/O handlers. Ideally, the order of + // dispatching should be a strategy... + else if (this->dispatch_timer_handlers (other_handlers_dispatched) == -1) + // State has changed or timer queue has failed, exit loop. + break; + + // Check to see if there are no more I/O handles left to + // dispatch AFTER we've handled the timers... + else if (active_handle_count == 0) + return io_handlers_dispatched + + other_handlers_dispatched + + signal_occurred; + + // Next dispatch the notification handlers (if there are any to + // dispatch). These are required to handle multi-threads that + // are trying to update the <Reactor>. + + else if (this->dispatch_notification_handlers + (dispatch_set, + active_handle_count, + other_handlers_dispatched) == -1) + // State has changed or a serious failure has occured, so exit + // loop. + break; + + // Finally, dispatch the I/O handlers. + else if (this->dispatch_io_handlers + (dispatch_set, + active_handle_count, + io_handlers_dispatched) == -1) + // State has changed, so exit loop. + break; + + // if state changed, we need to re-eval active_handle_count, + // so we will not end with an endless loop + if (this->state_changed_) + { + active_handle_count = this->dispatch_set_.rd_mask_.num_set () + + this->dispatch_set_.wr_mask_.num_set () + + this->dispatch_set_.ex_mask_.num_set (); + } + } + while (active_handle_count > 0); + + return io_handlers_dispatched + other_handlers_dispatched + signal_occurred; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::release_token (void) +{ +#if defined (ACE_WIN32) + this->token_.release (); + return (int) EXCEPTION_CONTINUE_SEARCH; +#else + return 0; +#endif /* ACE_WIN32 */ +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events + (ACE_Time_Value *max_wait_time) +{ + ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); + +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + + // Stash the current time -- the destructor of this object will + // automatically compute how much time elapsed since this method was + // called. + ACE_Countdown_Time countdown (max_wait_time); + + ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1); + + if (ACE_OS::thr_equal (ACE_Thread::self (), + this->owner_) == 0 || this->deactivated_) + return -1; + + // Update the countdown to reflect time waiting for the mutex. + countdown.update (); +#else + if (this->deactivated_) + return -1; +#endif /* ACE_MT_SAFE */ + + return this->handle_events_i (max_wait_time); +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events_i + (ACE_Time_Value *max_wait_time) +{ + int result = -1; + + ACE_SEH_TRY + { + // We use the data member dispatch_set_ as the current dispatch + // set. + + // We need to start from a clean dispatch_set + this->dispatch_set_.rd_mask_.reset (); + this->dispatch_set_.wr_mask_.reset (); + this->dispatch_set_.ex_mask_.reset (); + + int number_of_active_handles = + this->wait_for_multiple_events (this->dispatch_set_, + max_wait_time); + + result = + this->dispatch (number_of_active_handles, + this->dispatch_set_); + } + ACE_SEH_EXCEPT (this->release_token ()) + { + // As it stands now, we catch and then rethrow all Win32 + // structured exceptions so that we can make sure to release the + // <token_> lock correctly. + } + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> int +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::check_handles (void) +{ + ACE_TRACE ("ACE_Select_Reactor_T::check_handles"); + +#if defined (ACE_WIN32) || defined (__MVS__) || defined (ACE_VXWORKS) + ACE_Time_Value time_poll = ACE_Time_Value::zero; + ACE_Handle_Set rd_mask; +#endif /* ACE_WIN32 || MVS || ACE_VXWORKS */ + + int result = 0; + + /* + * It's easier to run through the handler repository iterator, but that + * misses handles that are registered on a handler that doesn't implement + * get_handle(). So, build a handle set that's the union of the three + * wait_sets (rd, wrt, ex) and run through that. Bad handles get cleared + * out of all sets. + */ + ACE_HANDLE h; + ACE_Handle_Set check_set (this->wait_set_.rd_mask_); + ACE_Handle_Set_Iterator wr_iter (this->wait_set_.wr_mask_); + while ((h = wr_iter ()) != ACE_INVALID_HANDLE) + check_set.set_bit (h); + ACE_Handle_Set_Iterator ex_iter (this->wait_set_.ex_mask_); + while ((h = ex_iter ()) != ACE_INVALID_HANDLE) + check_set.set_bit (h); + + ACE_Handle_Set_Iterator check_iter (check_set); + while ((h = check_iter ()) != ACE_INVALID_HANDLE) + { + +#if defined (ACE_WIN32) || defined (__MVS__) || defined (ACE_VXWORKS) + // Win32 needs to do the check this way because fstat won't work on + // a socket handle. MVS Open Edition needs to do it this way because, + // even though the docs say to check a handle with either select or + // fstat, the fstat method always says the handle is ok. + // pSOS needs to do it this way because file handles and socket handles + // are maintained by separate pieces of the system. VxWorks needs the select + // variant since fstat always returns an error on socket FDs. + rd_mask.set_bit (h); + + int select_width; +# if defined (ACE_WIN32) + // This arg is ignored on Windows and causes pointer truncation + // warnings on 64-bit compiles. + select_width = 0; +# else + select_width = int (h) + 1; +# endif /* ACE_WIN32 */ + + if (ACE_OS::select (select_width, + rd_mask, 0, 0, + &time_poll) < 0) + { + result = 1; + this->remove_handler_i (h, ACE_Event_Handler::ALL_EVENTS_MASK); + } + rd_mask.clr_bit (h); +#else /* !ACE_WIN32 && !MVS && !VXWORKS */ + struct stat temp; + + if (ACE_OS::fstat (h, &temp) == -1) + { + result = 1; + this->remove_handler_i (h, ACE_Event_Handler::ALL_EVENTS_MASK); + } +#endif /* ACE_WIN32 || MVS */ + } + + return result; +} + +template <class ACE_SELECT_REACTOR_TOKEN> void +ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_Select_Reactor_T::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + this->timer_queue_->dump (); + this->handler_rep_.dump (); + this->signal_handler_->dump (); + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("delete_signal_handler_ = %d\n"), + this->delete_signal_handler_)); + + ACE_HANDLE h; + + for (ACE_Handle_Set_Iterator handle_iter_wr (this->wait_set_.wr_mask_); + (h = handle_iter_wr ()) != ACE_INVALID_HANDLE; + ++handle_iter_wr) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("write_handle = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_rd (this->wait_set_.rd_mask_); + (h = handle_iter_rd ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("read_handle = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_ex (this->wait_set_.ex_mask_); + (h = handle_iter_ex ()) != ACE_INVALID_HANDLE; + ++handle_iter_ex) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("except_handle = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_wr_ready (this->ready_set_.wr_mask_); + (h = handle_iter_wr_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_wr_ready) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("write_handle_ready = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_rd_ready (this->ready_set_.rd_mask_); + (h = handle_iter_rd_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_rd_ready) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("read_handle_ready = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_ex_ready (this->ready_set_.ex_mask_); + (h = handle_iter_ex_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_ex_ready) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("except_handle_ready = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_su_ready (this->suspend_set_.wr_mask_); + (h = handle_iter_su_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_su_ready) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("write_handle_suspend = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_su_ready (this->suspend_set_.rd_mask_); + (h = handle_iter_su_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_su_ready) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("read_handle_suspend = %d\n"), h)); + + for (ACE_Handle_Set_Iterator handle_iter_su_ready (this->suspend_set_.ex_mask_); + (h = handle_iter_su_ready ()) != ACE_INVALID_HANDLE; + ++handle_iter_su_ready) + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("except_handle_suspend = %d\n"), h)); + + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("restart_ = %d\n"), this->restart_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("requeue_position_ = %d\n"), this->requeue_position_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("initialized_ = %d\n"), this->initialized_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("owner_ = %d\n"), this->owner_)); + +#if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) + this->notify_handler_->dump (); + this->token_.dump (); +#endif /* ACE_MT_SAFE */ + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#endif /* ACE_SELECT_REACTOR_T_CPP */ |