#include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_string.h" #include "ace/OS_NS_time.h" #include "ace/Task.h" #include "ace/Unbounded_Queue.h" #include "ace/Synch.h" #include "ace/SString.h" #include "ace/Method_Request.h" #include "ace/Future.h" #include "ace/Activation_Queue.h" #include "ace/Condition_T.h" #define OUTSTANDING_REQUESTS 20 // Listing 2 code/ch16 class CompletionCallBack: public ACE_Future_Observer { public: virtual void update (const ACE_Future & future) { ACE_CString *result = 0; // Block for the result. future.get (result); ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ())); delete result; } }; // Listing 2 // Listing 1 code/ch16 class LongWork : public ACE_Method_Request { public: virtual int call () { ACE_TRACE ("LongWork::call"); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n"))); ACE_OS::sleep (1); char buf[1024]; ACE_OS::strcpy (buf, "Completed assigned task\n"); ACE_CString *msg; ACE_NEW_RETURN (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1); result_.set (msg); return 0; } ACE_Future &future () { ACE_TRACE ("LongWork::future"); return result_; } void attach (CompletionCallBack *cb) { result_.attach (cb); } private: ACE_Future result_; }; // Listing 1 class Exit : public ACE_Method_Request { public: virtual int call () { ACE_TRACE ("Exit::call"); return -1; } }; class Worker; class IManager { public: virtual ~IManager () { } virtual int return_to_work (Worker *worker) = 0; }; // Listing 3 code/ch16 class Worker: public ACE_Task { public: Worker (IManager *manager) : manager_(manager), queue_ (msg_queue ()) { } int perform (ACE_Method_Request *req) { ACE_TRACE ("Worker::perform"); return this->queue_.enqueue (req); } virtual int svc () { thread_id_ = ACE_Thread::self (); while (1) { ACE_Method_Request *request = this->queue_.dequeue(); if (request == 0) return -1; // Invoke the request int result = request->call (); if (result == -1) break; // Return to work. this->manager_->return_to_work (this); } return 0; } ACE_thread_t thread_id (); private: IManager *manager_; ACE_thread_t thread_id_; ACE_Activation_Queue queue_; }; // Listing 3 ACE_thread_t Worker::thread_id () { return thread_id_; } // Listing 4 code/ch16 class Manager : public ACE_Task_Base, private IManager { public: enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; Manager () : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) { ACE_TRACE ("Manager"); } int perform (ACE_Method_Request *req) { ACE_TRACE ("perform"); return this->queue_.enqueue (req); } int svc () { ACE_TRACE ("svc"); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); // Create pool when you get in the first time. create_worker_pool (); while (!done ()) { ACE_Time_Value tv ((long)MAX_TIMEOUT); tv += ACE_OS::time (0); // Get the next message ACE_Method_Request *request = this->queue_.dequeue (&tv); if (request == 0) { shut_down (); break; } // Choose a worker. Worker *worker = choose_worker (); // Ask the worker to do the job. worker->perform (request); } return 0; } int shut_down (); virtual int return_to_work (Worker *worker) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n"))); this->workers_.enqueue_tail (worker); this->workers_cond_.signal (); return 0; } private: Worker *choose_worker () { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0) while (this->workers_.is_empty ()) workers_cond_.wait (); Worker *worker = 0; this->workers_.dequeue_head (worker); return worker; } int create_worker_pool () { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); for (int i = 0; i < POOL_SIZE; i++) { Worker *worker; ACE_NEW_RETURN (worker, Worker (this), -1); this->workers_.enqueue_tail (worker); worker->activate (); } return 0; } int done () { return (shutdown_ == 1); } ACE_thread_t thread_id (Worker *worker) { return worker->thread_id (); } private: int shutdown_; ACE_Thread_Mutex workers_lock_; ACE_Condition workers_cond_; ACE_Unbounded_Queue workers_; ACE_Activation_Queue queue_; }; // Listing 4 int Manager::shut_down () { ACE_TRACE ("Manager::shut_down"); ACE_Unbounded_Queue::ITERATOR iter = this->workers_.begin (); Worker **worker_ptr = 0; do { iter.next (worker_ptr); Worker *worker = (*worker_ptr); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"), thread_id (worker))); Exit *req; ACE_NEW_RETURN (req, Exit(), -1); // Send the hangup message worker->perform (req); // Wait for the exit. worker->wait (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker %d shut down.\n"), thread_id (worker))); delete req; delete worker; } while (iter.advance ()); shutdown_ = 1; return 0; } // Listing 5 code/ch16 int ACE_TMAIN (int, ACE_TCHAR *[]) { Manager tp; tp.activate (); ACE_Time_Value tv; tv.msec (100); // Wait for a few seconds every time you send a message. CompletionCallBack cb; LongWork workArray[OUTSTANDING_REQUESTS]; for (int i = 0; i < OUTSTANDING_REQUESTS; i++) { workArray[i].attach (&cb); ACE_OS::sleep (tv); tp.perform (&workArray[i]); } ACE_Thread_Manager::instance ()->wait (); return 0; } // Listing 5 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */