summaryrefslogtreecommitdiff
path: root/TAO/tao/CSD_ThreadPool/CSD_TP_Task.h
blob: 76d1004946df7cdb98ac49dce7813d145de434db (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// -*- C++ -*-

//=============================================================================
/**
 *  @file    CSD_TP_Task.h
 *
 *  @author  Tim Bradley <bradley_t@ociweb.com>
 */
//=============================================================================

#ifndef TAO_CSD_TP_TASK_H
#define TAO_CSD_TP_TASK_H

#include /**/ "ace/pre.h"

#include "tao/CSD_ThreadPool/CSD_TP_Export.h"

#include "tao/CSD_ThreadPool/CSD_TP_Queue.h"
#include "tao/PortableServer/PortableServer.h"
#include "tao/Condition.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/Task.h"
#include "ace/Synch.h"
#include "ace/Containers_T.h"
#include "ace/Vector_T.h"

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

namespace TAO
{
  namespace CSD
  {
    /// Typedef for the number of threads.
    typedef unsigned long Thread_Counter;

    /**
     * @class TP_Task
     *
     * @brief Active Object managing a queue of request objects.
     *
     * There are two types of "users" of a TP_Task object:
     *
     *    1) The TP_Strategy object that "owns" this task object.
     *    2) The worker threads that "run" this task object as an
     *       "active object".
     *
     * The TP_Strategy object that "owns" this task object dictates
     * when the worker threads are activated and when they are shutdown.  It
     * also injects requests into this task's queue via calls to the
     * add_request() method.  It is also the TP_Strategy object that
     * dictates the number of worker threads to be activated via a call to
     * the set_num_threads() method.
     *
     * The active object pattern is implemented via the use of the
     * the ACE_Task_Base base class, and each worker thread will
     * invoke this task's svc() method, and when the svc() returns, the
     * worker thread will invoke this task's close() method (with the
     * flag argument equal to 0).
     *
     * @note I just wanted to document an idea...  When the pool consists
     *       of only one worker thread, we could care less about checking
     *       if target servant objects are busy or not.  The simple fact
     *       that only one thread will be dispatching all requests means
     *       that servant objects will never be busy when the thread
     *       tests to see if a request is "ready_for_dispatch()".  I'm
     *       just wondering if this knowledge can be applied to the
     *       implementation such that the "pool with one worker thread" case
     *       performs more efficiently.  This is STP vs SSTP.
     *
     */
    class TAO_CSD_TP_Export TP_Task : public ACE_Task_Base
    {
    public:

      /// Default Constructor.
      TP_Task();

      /// Virtual Destructor.
      virtual ~TP_Task();

      /// Put a request object on to the request queue.
      /// Returns true if successful, false otherwise (it has been "rejected").
      bool add_request(TP_Request* request);

      /// Activate the worker threads
      virtual int open(void* args = 0);

      /// The "mainline" executed by each worker thread.
      virtual int svc();

      /// Multi-purpose: argument value is used to differentiate purpose.
      ///
      /// 0) Invoked by each worker thread after its invocation of the
      ///    svc() method has completed (ie, returned).
      /// 1) Invoked by the strategy object to shutdown all worker threads.
      virtual int close(u_long flag = 0);

      /// Cancel all requests that are targeted for the provided servant.
      void cancel_servant (PortableServer::Servant servant);

    private:

      typedef TAO_SYNCH_MUTEX         LockType;
      typedef TAO_Condition<LockType> ConditionType;

      /// Lock to protect the "state" (all of the data members) of this object.
      LockType lock_;

      /// Condition used to signal worker threads that they may be able to
      /// find a request in the queue_ that needs to be dispatched to a
      /// servant that is currently "not busy".
      /// This condition will be signal()'ed each time a new request is
      /// added to the queue_, and also when a servant has become "not busy".
      ConditionType work_available_;

      /// This condition will be signal()'ed each time the num_threads_
      /// data member has its value changed.  This is used to keep the
      /// close(1) invocation (ie, a shutdown request) blocked until all
      /// of the worker threads have stopped running.
      ConditionType active_workers_;

      /// Flag used to indicate when this task will (or will not) accept
      /// requests via the the add_request() method.
      bool accepting_requests_;

      /// Flag used to initiate a shutdown request to all worker threads.
      bool shutdown_initiated_;

      /// Complete shutdown needed to be deferred because the thread calling
      /// close(1) was also one of the ThreadPool threads
      bool deferred_shutdown_initiated_;

      /// Flag used to avoid multiple open() calls.
      bool opened_;

      /// The number of currently active worker threads.
      Thread_Counter num_threads_;

      /// The queue of pending servant requests (a.k.a. the "request queue").
      TP_Queue queue_;

      typedef ACE_Vector <ACE_thread_t> Thread_Ids;

      /// The list of ids for the threads launched by this task.
      Thread_Ids activated_threads_;

      enum { MAX_THREADPOOL_TASK_WORKER_THREADS = 50 };
    };

  }
}

TAO_END_VERSIONED_NAMESPACE_DECL

#if defined (__ACE_INLINE__)
# include "tao/CSD_ThreadPool/CSD_TP_Task.inl"
#endif /* __ACE_INLINE__ */

#include /**/ "ace/post.h"

#endif /* TAO_CSD_TP_TASK_H */