diff options
Diffstat (limited to 'ACE/ace/TP_Reactor.h')
-rw-r--r-- | ACE/ace/TP_Reactor.h | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/ACE/ace/TP_Reactor.h b/ACE/ace/TP_Reactor.h new file mode 100644 index 00000000000..5c39225c1e7 --- /dev/null +++ b/ACE/ace/TP_Reactor.h @@ -0,0 +1,370 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file TP_Reactor.h + * + * $Id$ + * + * The ACE_TP_Reactor (aka, Thread Pool Reactor) uses the + * Leader/Followers pattern to demultiplex events among a pool of + * threads. When using a thread pool reactor, an application + * pre-spawns a _fixed_ number of threads. When these threads + * invoke the ACE_TP_Reactor's <handle_events> method, one thread + * will become the leader and wait for an event. The other + * follower threads will queue up waiting for their turn to become + * the leader. When an event occurs, the leader will pick a + * follower to become the leader and go on to handle the event. + * The consequence of using ACE_TP_Reactor is the amortization of + * the costs used to creating threads. The context switching cost + * will also reduce. More over, the total resources used by + * threads are bounded because there are a fixed number of threads. + * + * @author Irfan Pyarali <irfan@cs.wustl.edu> + * @author Nanbor Wang <nanbor@cs.wustl.edu> + */ +//============================================================================= + + +#ifndef ACE_TP_REACTOR_H +#define ACE_TP_REACTOR_H + +#include /**/ "ace/pre.h" + +#include "ace/Select_Reactor.h" +#include "ace/Timer_Queue.h" /* Simple forward decl won't work... */ + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +/** + * @class ACE_EH_Dispatch_Info + * + * @brief This structure contains information of the activated event + * handler. + */ +class ACE_EH_Dispatch_Info +{ +public: + ACE_EH_Dispatch_Info (void); + + void set (ACE_HANDLE handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask, + ACE_EH_PTMF callback); + + bool dispatch (void) const; + + ACE_HANDLE handle_; + ACE_Event_Handler *event_handler_; + ACE_Reactor_Mask mask_; + ACE_EH_PTMF callback_; + int resume_flag_; + bool reference_counting_required_; + +private: + bool dispatch_; + + // Disallow copying and assignment. + ACE_EH_Dispatch_Info (const ACE_EH_Dispatch_Info &); + ACE_EH_Dispatch_Info &operator= (const ACE_EH_Dispatch_Info &); +}; + + +/** + * @class ACE_TP_Token_Guard + * + * @brief A helper class that helps grabbing, releasing and waiting + * on tokens for a thread that tries calling handle_events (). + * + * In short, this class will be owned by one thread by creating on the + * stack. This class gives the status of the ownership of the token + * and manages the ownership + */ + +class ACE_TP_Token_Guard +{ +public: + + /// Constructor that will grab the token for us + ACE_TP_Token_Guard (ACE_Select_Reactor_Token &token); + + /// Destructor. This will release the token if it hasnt been + /// released till this point + ~ACE_TP_Token_Guard (void); + + /// Release the token .. + void release_token (void); + + /// Returns whether the thread that created this object ownes the + /// token or not. + int is_owner (void); + + /// A helper method that grabs the token for us, after which the + /// thread that owns that can do some actual work. + int acquire_read_token (ACE_Time_Value *max_wait_time = 0); + + /** + * A helper method that grabs the token for us, after which the + * thread that owns that can do some actual work. This differs from + * acquire_read_token() as it uses acquire () to get the token instead of + * acquire_read () + */ + int acquire_token (ACE_Time_Value *max_wait_time = 0); + +private: + + // Disallow default construction. + ACE_TP_Token_Guard (void); + + // Disallow copying and assignment. + ACE_TP_Token_Guard (const ACE_TP_Token_Guard &); + ACE_TP_Token_Guard &operator= (const ACE_TP_Token_Guard &); + +private: + + /// The Select Reactor token. + ACE_Select_Reactor_Token &token_; + + /// Flag that indicate whether the thread that created this object + /// owns the token or not. A value of 0 indicates that this class + /// hasnt got the token (and hence the thread) and a value of 1 + /// vice-versa. + int owner_; + +}; + +/** + * @class ACE_TP_Reactor + * + * @brief Specialization of Select Reactor to support thread-pool + * based event dispatching. + * + * One of the short comings of the Select_Reactor in ACE is that it + * did not support a thread pool based event dispatching model, + * similar to the one in WFMO_Reactor. In Select_Reactor, only thread + * can be blocked in <handle_events> at any given time. + * + * A new Reactor has been added to ACE that removes this short-coming. + * TP_Reactor is a specialization of Select Reactor to support + * thread-pool based event dispatching. This Reactor takes advantage + * of the fact that events reported by <select> are persistent if not + * acted upon immediately. It works by remembering the event handler + * that just got activated, releasing the internal lock (so that some + * other thread can start waiting in the event loop) and then + * dispatching the event handler outside the context of the Reactor + * lock. After the event handler has been dispatched the event handler is + * resumed again. Don't call remove_handler() from the handle_x methods, + * instead return -1. + * + * This Reactor is best suited for situations when the callbacks to + * event handlers can take arbitrarily long and/or a number of threads + * are available to run the event loops. Note that callback code in + * Event Handlers (e.g. Event_Handler::handle_input) does not have to + * be modified or made thread-safe for this Reactor. This is because + * an activated Event Handler is suspended in the Reactor before the + * upcall is made and resumed after the upcall completes. Therefore, + * one Event Handler cannot be called by multiple threads + * simultaneously. + */ +class ACE_Export ACE_TP_Reactor : public ACE_Select_Reactor +{ +public: + + // = Initialization and termination methods. + + /// Initialize ACE_TP_Reactor with the default size. + ACE_TP_Reactor (ACE_Sig_Handler * = 0, + ACE_Timer_Queue * = 0, + int mask_signals = 1, + int s_queue = ACE_Select_Reactor_Token::FIFO); + + /** + * Initialize the ACE_TP_Reactor to manage + * @a max_number_of_handles. If @a restart is non-0 then the + * ACE_Reactor's <handle_events> method will be restarted + * automatically when <EINTR> occurs. If <signal_handler> or + * <timer_queue> are non-0 they are used as the signal handler and + * timer queue, respectively. + */ + ACE_TP_Reactor (size_t max_number_of_handles, + int restart = 0, + ACE_Sig_Handler *sh = 0, + ACE_Timer_Queue *tq = 0, + int mask_signals = 1, + int s_queue = ACE_Select_Reactor_Token::FIFO); + + // = Event loop drivers. + + /** + * This event loop driver that blocks for <max_wait_time> before + * returning. It will return earlier if timer events, I/O events, + * or signal events occur. Note that <max_wait_time> can be 0, in + * which case this method blocks indefinitely until events occur. + * + * <max_wait_time> is decremented to reflect how much time this call + * took. For instance, if a time value of 3 seconds is passed to + * handle_events and an event occurs after 2 seconds, + * <max_wait_time> will equal 1 second. This can be used if an + * application wishes to handle events for some fixed amount of + * time. + * + * Returns the total number of ACE_Event_Handlers that were + * dispatched, 0 if the <max_wait_time> elapsed without dispatching + * any handlers, or -1 if something goes wrong. + */ + virtual int handle_events (ACE_Time_Value *max_wait_time = 0); + + virtual int handle_events (ACE_Time_Value &max_wait_time); + + /* + * @todo The following methods are not supported. Support for + * signals is not available in the TP_Reactor. These methods will be + * supported once signal handling is supported. + */ + virtual int register_handler (int signum, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp = 0, + ACE_Event_Handler **old_sh = 0, + ACE_Sig_Action *old_disp = 0); + + virtual int register_handler (const ACE_Sig_Set &sigset, + ACE_Event_Handler *new_sh, + ACE_Sig_Action *new_disp = 0); + + /** + * The following template methods have been declared here to avoid + * some compilers complaining that we have hidden some of the other + * virtual functions. We need to override functions with signal + * handlers and return -1 since the TP_Reactor does not support + * signals. The definition of the following functions is just a + * side-effect. The actual definitions will just call the base class + * method. For detailed documentation of these methods please see + * Select_Reactor_T.h. + */ +//@{ + + virtual int register_handler (ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + + virtual int register_handler (ACE_HANDLE handle, + ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + +#if defined (ACE_WIN32) + + + + virtual int register_handler (ACE_Event_Handler *event_handler, + ACE_HANDLE event_handle = ACE_INVALID_HANDLE); + +#endif /* ACE_WIN32 */ + + virtual int register_handler (ACE_HANDLE event_handle, + ACE_HANDLE io_handle, + ACE_Event_Handler *event_handler, + ACE_Reactor_Mask mask); + + virtual int register_handler (const ACE_Handle_Set &handles, + ACE_Event_Handler *eh, + ACE_Reactor_Mask mask); + + //@} + + /// Does the reactor allow the application to resume the handle on + /// its own ie. can it pass on the control of handle resumption to + /// the application. The TP reactor has can allow applications to + /// resume handles. So return a positive value. + virtual int resumable_handler (void); + + /// Called from handle events + static void no_op_sleep_hook (void *); + + // = Any thread can perform a <handle_events>, override the owner() + // methods to avoid the overhead of setting the owner thread. + + /// Set the new owner of the thread and return the old owner. + virtual int owner (ACE_thread_t n_id, ACE_thread_t *o_id = 0); + + /// Return the current owner of the thread. + virtual int owner (ACE_thread_t *t_id); + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +protected: + // = Internal methods that do the actual work. + + /// Template method from the base class. + virtual void clear_dispatch_mask (ACE_HANDLE handle, + ACE_Reactor_Mask mask); + + /// Dispatch just 1 signal, timer, notification handlers + int dispatch_i (ACE_Time_Value *max_wait_time, + ACE_TP_Token_Guard &guard); + + /// Get the event that needs dispatching. It could be either a + /// signal, timer, notification handlers or return possibly 1 I/O + /// handler for dispatching. In the most common use case, this would + /// return 1 I/O handler for dispatching + int get_event_for_dispatching (ACE_Time_Value *max_wait_time); + + /// Method to handle signals + /// @note It is just busted at this point in time. + int handle_signals (int &event_count, + ACE_TP_Token_Guard &g); + + /// Handle timer events + int handle_timer_events (int &event_count, + ACE_TP_Token_Guard &g); + + /// Handle notify events + int handle_notify_events (int &event_count, + ACE_TP_Token_Guard &g); + + /// handle socket events + int handle_socket_events (int &event_count, + ACE_TP_Token_Guard &g); + + /// This method shouldn't get called. + virtual void notify_handle (ACE_HANDLE handle, + ACE_Reactor_Mask mask, + ACE_Handle_Set &, + ACE_Event_Handler *eh, + ACE_EH_PTMF callback); +private: + + /// Get the handle of the notify pipe from the ready set if there is + /// an event in the notify pipe. + ACE_HANDLE get_notify_handle (void); + + /// Get socket event dispatch information. + int get_socket_event_info (ACE_EH_Dispatch_Info &info); + + /// Notify the appropriate <callback> in the context of the <eh> + /// associated with <handle> that a particular event has occurred. + int dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info); + + /// Clear the @a handle from the read_set + void clear_handle_read_set (ACE_HANDLE handle); + + int post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,int status); + +private: + /// Deny access since member-wise won't work... + ACE_TP_Reactor (const ACE_TP_Reactor &); + ACE_TP_Reactor &operator = (const ACE_TP_Reactor &); +}; + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "ace/TP_Reactor.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" + +#endif /* ACE_TP_REACTOR_H */ |