diff options
author | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-03-22 12:02:26 +0000 |
---|---|---|
committer | jxh <jxh@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-03-22 12:02:26 +0000 |
commit | 877dcde02acffdce788886d5edff52ab6169948d (patch) | |
tree | e089d634dfb81fc8a0510cf3d7f41376de86cc0e /apps | |
parent | 0f445ee28e27bef39a14cba45bc2ebf32758b164 (diff) | |
download | ATCD-877dcde02acffdce788886d5edff52ab6169948d.tar.gz |
Getting it together. Initial checkin for new framwork components.
Diffstat (limited to 'apps')
18 files changed, 463 insertions, 111 deletions
diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.cpp index e49e8b268c0..7f0cbe0f850 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.cpp +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.cpp @@ -1,10 +1,8 @@ // $Id$ #include "JAWS/Concurrency.h" - -JAWS_Dispatcher_Singleton jaws_dispatcher; -JAWS_Thread_Pool_Singleton jaws_thread_pool; -JAWS_Thread_Per_Singleton jaws_thread_per; +#include "JAWS/Pipeline.h" +#include "JAWS/Data_Block.h" JAWS_Concurrency_Base::JAWS_Concurrency_Base (void) { @@ -31,10 +29,30 @@ JAWS_Concurrency_Base::svc (void) // yourself with 0 threads. result = this->getq (mb); + + // Use a NULL message block to indicate that the thread should shut + // itself down if (result == -1 || mb == 0) break; - this->put_next (mb); + do + { + JAWS_Data_Block *db; + JAWS_IO_Handler *ioh; + JAWS_Pipeline_Task *task; + + db = ACE_dynamic_cast (JAWS_Data_Block *, mb->data_block ()); + task = db->task (); + + // Use a NULL task to make the thread recycle now + if (task == 0) + break; + + result = task->put (mb); + if (result == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Concurrency_Base::svc")); + } + while (result == 0); } return 0; } @@ -47,19 +65,28 @@ JAWS_Dispatch_Policy::~JAWS_Dispatch_Policy (void) { } -JAWS_Dispatcher::JAWS_Dispatcher (JAWS_Dispatch_Policy *policy) - : policy_(policy) +JAWS_Dispatcher::JAWS_Dispatcher (void) + : policy_(0) { } -JAWS_Thread_Pool_Task::JAWS_Thread_Pool_Task (long flags, - int nthreads, - int maxthreads) - : nthreads_ (nthreads), - maxthreads_ (maxthreads) +int +JAWS_Dispatcher::dispatch (ACE_Message_Block *mb) { - if (this->activate (flags, nthreads) == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "JAWS_Thread_Pool_Task::activate")); + return this->policy ()->concurrency ()->put (mb); +} + +JAWS_Dispatch_Policy * +JAWS_Dispatcher::policy (void) +{ + return this->policy_; +} + +JAWS_Dispatch_Policy * +JAWS_Dispatcher::policy (JAWS_Dispatch_Policy *p) +{ + this->policy_ = p; + return this->policy_; } int @@ -73,12 +100,6 @@ JAWS_Thread_Pool_Task::open (long flags, int nthreads, int maxthreads) -1); } -JAWS_Thread_Per_Task::JAWS_Thread_Per_Task (long flags, int maxthreads) - : flags_ (flags), - maxthreads_ (maxthreads) -{ -} - int JAWS_Thread_Per_Task::open (long flags, int maxthreads) { @@ -103,11 +124,11 @@ JAWS_Thread_Per_Task::put (ACE_Message_Block *mb, ACE_Time_Value *tv) } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) -template class ACE_Singleton<JAWS_Dispatcher, ACE_MT_SYNCH>; -template class ACE_Singleton<JAWS_Thread_Pool_Task, ACE_MT_SYNCH>; -template class ACE_Singleton<JAWS_Thread_Per_Task, ACE_MT_SYNCH>; +template class ACE_Singleton<JAWS_Dispatcher, ACE_SYNCH_MUTEX>; +template class ACE_Singleton<JAWS_Thread_Pool_Task, ACE_SYNCH_MUTEX>; +template class ACE_Singleton<JAWS_Thread_Per_Task, ACE_SYNCH_MUTEX>; #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) -#pragma instantiate ACE_Singleton<JAWS_Dispatcher, ACE_MT_SYNCH> -#pragma instantiate ACE_Singleton<JAWS_Thread_Pool_Task, ACE_MT_SYNCH> -#pragma instantiate ACE_Singleton<JAWS_Thread_Per_Task, ACE_MT_SYNCH> +#pragma instantiate ACE_Singleton<JAWS_Dispatcher, ACE_SYNCH_MUTEX> +#pragma instantiate ACE_Singleton<JAWS_Thread_Pool_Task, ACE_SYNCH_MUTEX> +#pragma instantiate ACE_Singleton<JAWS_Thread_Per_Task, ACE_SYNCH_MUTEX> #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.h b/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.h index 5af61e2c119..92e7cab32a0 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.h +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Concurrency.h @@ -4,7 +4,7 @@ #if !defined (JAWS_CONCURRENCY_H) #define JAWS_CONCURRENCY_H -#include "ace/Singelton.h" +#include "ace/Singleton.h" #include "ace/Synch.h" #include "ace/Task.h" @@ -38,7 +38,8 @@ class JAWS_Dispatch_Policy public: JAWS_Dispatch_Policy (void); virtual ~JAWS_Dispatch_Policy (void); - virtual JAWS_Concurrency_Base * update (void *state = 0) = 0; + virtual JAWS_Concurrency_Base * concurrency (void) = 0; + virtual JAWS_IO * io (void) = 0; }; class JAWS_Dispatcher @@ -51,9 +52,11 @@ class JAWS_Dispatcher // IO can find a thread to take care of it. { public: - JAWS_Dispatcher (JAWS_Dispatch_Policy *policy); + JAWS_Dispatcher (void); - int dispatch (JAWS_IO_Handler *ioh); + int dispatch (ACE_Message_Block *mb); + JAWS_Dispatch_Policy *policy (void); + JAWS_Dispatch_Policy *policy (JAWS_Dispatch_Policy *p); private: JAWS_Dispatch_Policy *policy_; @@ -99,17 +102,13 @@ private: int maxthreads_; }; -typedef ACE_Singleton<JAWS_Dispatcher, ACE_MT_SYNCH> +typedef ACE_Singleton<JAWS_Dispatcher, ACE_SYNCH_MUTEX> JAWS_Dispatcher_Singleton; -typedef ACE_Singleton<JAWS_Thread_Pool_Task, ACE_MT_SYNCH> +typedef ACE_Singleton<JAWS_Thread_Pool_Task, ACE_SYNCH_MUTEX> JAWS_Thread_Pool_Singleton; -typedef ACE_Singleton<JAWS_Thread_Per_Task, ACE_MT_SYNCH> +typedef ACE_Singleton<JAWS_Thread_Per_Task, ACE_SYNCH_MUTEX> JAWS_Thread_Per_Singleton; -extern JAWS_Dispatcher_Singleton jaws_dispatcher; -extern JAWS_Thread_Pool_Singleton jaws_thread_pool; -extern JAWS_Thread_Per_Singleton jaws_thread_per; - #endif /* !defined (JAWS_CONCURRENCY_H) */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Data_Block.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/Data_Block.cpp new file mode 100644 index 00000000000..dc982a11978 --- /dev/null +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Data_Block.cpp @@ -0,0 +1,73 @@ +// $Id$ + +#include "JAWS/Data_Block.h" +#include "JAWS/Policy.h" + +JAWS_Pipeline_Task * +JAWS_Data_Block::task (void) +{ + return this->task_; +} + +ACE_INET_Addr * +JAWS_Data_Block::addr (void) +{ + return this->addr_; +} + +JAWS_IO_Handler * +JAWS_Data_Block::handler (void) +{ + return this->handler_; +} + +JAWS_Dispatch_Policy * +JAWS_Data_Block::policy (void) +{ + return this->policy_; +} + +void +JAWS_Data_Block::task (JAWS_Pipeline_Handler *taskp) +{ + this->task_ = taskp; +} + +void +JAWS_Data_Block::addr (ACE_INET_Addr *addrp) +{ + this->addr_ = addrp; +} + +void +JAWS_Data_Block::handler (JAWS_IO_Handler *handlerp) +{ + this->handler_ = handlerp; +} + +void +JAWS_Data_Block::policy (JAWS_Dispatch_Policy *policyp) +{ + this->policy_ = policyp; +} + +int +JAWS_Pipeline_Accept_Task::handle_put (JAWS_Data_Block *data, + ACE_Time_Value *) +{ + /* JAWS_Data_Block should contain an INET_Addr and an IO */ + // JAWS_IO_Handler *handler = data->handler (); + JAWS_Dispatch_Policy *policy = data->policy (); + + // data->policy ()->update (handler); + + JAWS_IO *io = policy->io (); + io->accept (data->addr ()); + return 0; +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Data_Block.h b/apps/JAWS/server/PROTOTYPE/JAWS/Data_Block.h new file mode 100644 index 00000000000..6763233401f --- /dev/null +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Data_Block.h @@ -0,0 +1,51 @@ +/* -*- c++ -*- */ +// $Id$ + +#if !defined (JAWS_DATA_BLOCK_H) +#define JAWS_DATA_BLOCK_H + +#include "ace/Singleton.h" + +#include "JAWS/Pipeline.h" + +class JAWS_IO_Handler; +class JAWS_Dispatch_Policy; +class JAWS_Data_Block; + +typedef JAWS_Pipeline_Abstract_Handler<JAWS_Data_Block> + JAWS_Pipeline_Handler; + +class JAWS_Data_Block : public ACE_Data_Block +// = TITLE +// Defines the communication unit between pipeline components +{ +public: + JAWS_Pipeline_Task *task (void); + ACE_INET_Addr *addr (void); + JAWS_IO_Handler * handler (void); + JAWS_Dispatch_Policy * policy (void); + + void task (JAWS_Pipeline_Handler *taskp); + void addr (ACE_INET_Addr *addrp); + void handler (JAWS_IO_Handler * handlerp); + void policy (JAWS_Dispatch_Policy * policyp); + +private: + JAWS_IO_Handler *handler_; + JAWS_Dispatch_Policy *policy_; + JAWS_Pipeline_Handler *task_; + ACE_INET_Addr *addr_; +}; + +class JAWS_Pipeline_Accept_Task : public JAWS_Pipeline_Handler +{ +public: + virtual int handle_put (JAWS_Data_Block *data, ACE_Time_Value *tv); + +private: +}; + +typedef ACE_Singleton<JAWS_Pipeline_Accept_Task, ACE_SYNCH_MUTEX> + JAWS_Pipeline_Accept_Task_Singleton; + +#endif /* !defined (JAWS_DATA_BLOCK_H) */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/IO.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/IO.cpp index b820a0430d1..81027e0b8b6 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/IO.cpp +++ b/apps/JAWS/server/PROTOTYPE/JAWS/IO.cpp @@ -12,7 +12,9 @@ JAWS_IO::JAWS_IO (void) : handle_ (ACE_INVALID_HANDLE), - handler_ (0) + handler_ (0), + inet_addr_ (0), + acceptor_ (0) { } @@ -48,8 +50,11 @@ JAWS_Synch_IO::~JAWS_Synch_IO (void) } void -JAWS_Synch_IO::accept (ACE_SOCK_Stream &new_stream) +JAWS_Synch_IO::accept (ACE_INET_Addr *addr) { + // HACK + ACE_UNUSED_ARG (addr); + ACE_SOCK_Stream new_stream; if (this->acceptor_->accept (new_stream) == -1) this->handler_->accept_error (); else diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/IO.h b/apps/JAWS/server/PROTOTYPE/JAWS/IO.h index a3d55e5498b..fd440944109 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/IO.h +++ b/apps/JAWS/server/PROTOTYPE/JAWS/IO.h @@ -42,13 +42,13 @@ public: JAWS_IO (void); virtual ~JAWS_IO (void); void handler (JAWS_IO_Handler *handler); - void acceptor (JAWS_IO_Acceptor *acceptor); + // void acceptor (JAWS_IO_Acceptor *acceptor); void handle (ACE_HANDLE h); ACE_HANDLE handle (void); // James, please add documentation here. - virtual void accept (ACE_SOCK_Stream &new_stream) = 0; + virtual void accept (ACE_INET_Addr *addr) = 0; // accept a passive connection virtual void read (ACE_Message_Block& mb, int size) = 0; @@ -76,6 +76,7 @@ public: protected: ACE_HANDLE handle_; JAWS_IO_Handler *handler_; + ACE_INET_Addr *inet_addr_; JAWS_IO_Acceptor *acceptor_; }; @@ -91,7 +92,7 @@ public: ~JAWS_Synch_IO (void); - virtual void accept (ACE_SOCK_Stream &new_stream); + virtual void accept (ACE_INET_Addr *addr); void read (ACE_Message_Block& mb, int size); diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.cpp index 454f31019a4..9453c16693d 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.cpp +++ b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.cpp @@ -3,7 +3,7 @@ #include "JAWS/IO_Acceptor.h" int -JAWS_IO_Acceptor::open (const ACE_Addr &, int, int, int, int) +JAWS_IO_Acceptor::open (const ACE_INET_Addr &) { return -1; } @@ -16,27 +16,15 @@ JAWS_IO_Acceptor::accept (ACE_SOCK_Stream &, ACE_Addr *, ACE_Time_Value *, } int -JAWS_IO_Acceptor::open (const ACE_INET_Addr &, size_t, int, int, int, - ACE_Proactor *) -{ - return -1; -} - -int JAWS_IO_Acceptor::accept (size_t) { return -1; } int -JAWS_IO_Synch_Acceptor::open (const ACE_Addr &local_sap, - int reuse_addr, - int protocol_family, - int backlog, - int protocol) +JAWS_IO_Synch_Acceptor::open (const ACE_INET_Addr &local_sap) { - return this->acceptor_->open (local_sap, reuse_addr, protocol_family, - backlog, protocol); + return this->acceptor_->open (local_sap, 0, PF_INET, 5, 0); } int @@ -55,15 +43,9 @@ JAWS_IO_Synch_Acceptor::accept (ACE_SOCK_Stream &new_stream, // This only works on Win32 platforms int -JAWS_IO_Asynch_Acceptor::open (const ACE_INET_Addr &address, - size_t bytes_to_read, - int pass_addresses, - int backlog, - int reuse_addr, - ACE_Proactor *proactor) +JAWS_IO_Asynch_Acceptor::open (const ACE_INET_Addr &address); { - return this->acceptor_->open (address, bytes_to_read, pass_address, - backlog, reuse_addr, proactor); + return -1; } int @@ -72,4 +54,14 @@ JAWS_IO_Asynch_Acceptor::accept (size_t bytes_to_read) return this->acceptor_->accept (bytes_to_read); } + + #endif /* defined (ACE_WIN32) */ + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Singleton<JAWS_IO_Synch_Acceptor, ACE_SYNCH_MUTEX>; +template class ACE_Singleton<JAWS_IO_Asynch_Acceptor, ACE_SYNCH_MUTEX>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Singleton<JAWS_IO_Synch_Acceptor, ACE_SYNCH_MUTEX> +#pragma instantiate ACE_Singleton<JAWS_IO_Asynch_Acceptor, ACE_SYNCH_MUTEX> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.h b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.h index 8f2ef413ed2..36b7b4fabb0 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.h +++ b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Acceptor.h @@ -9,6 +9,7 @@ #include "ace/Asynch_Acceptor.h" #include "ace/LOCK_SOCK_Acceptor.h" +#include "ace/Singleton.h" #include "JAWS/IO.h" @@ -29,29 +30,17 @@ public: JAWS_IO_Acceptor (void); virtual ~JAWS_IO_Acceptor (void); - virtual int open (const ACE_Addr &local_sap, - int reuse_addr = 0, - int protocol_family = PF_INET, - int backlog = 5, - int protocol = 0) = 0; - // Initiate a synchronous passive mode socket. + virtual int open (const ACE_INET_Addr &address); + // Initiate a passive mode socket. virtual int accept (ACE_SOCK_Stream &new_stream, ACE_Addr *remote_addr = 0, ACE_Time_Value *timeout = 0, int restart = 1, - int reset_new_handle = 0) const = 0; + int reset_new_handle = 0) const; // Synchronously accept the connection - virtual int open (const ACE_INET_Addr &address, - size_t bytes_to_read = 0, - int pass_addresses = 0, - int backlog = 5, - int reuse_addr = 1, - ACE_Proactor *proactor = 0) = 0; - // Initiate an asynchronous passive connection - - virtual int accept (size_t bytes_to_read = 0) = 0; + virtual int accept (size_t bytes_to_read = 0); // This initiates a new asynchronous accept through the AcceptEx call. enum { ASYNC = 0, SYNCH = 1 }; @@ -66,11 +55,7 @@ class JAWS_IO_Synch_Acceptor : public JAWS_IO_Acceptor { public: - virtual int open (const ACE_Addr &local_sap, - int reuse_addr = 0, - int protocol_family = PF_INET, - int backlog = 5, - int protocol = 0); + virtual int open (const ACE_INET_Addr &local_sap); // Initiate a passive mode socket. virtual int accept (ACE_SOCK_Stream &new_stream, @@ -85,28 +70,30 @@ private: }; -#if defined (ACE_WIN32) -// This only works on Win32 platforms - class JAWS_IO_Asynch_Acceptor : public JAWS_IO_Acceptor { public: - virtual int open (const ACE_INET_Addr &address, - size_t bytes_to_read = 0, - int pass_addresses = 0, - int backlog = 5, - int reuse_addr = 1, - ACE_Proactor *proactor = 0); + virtual int open (const ACE_INET_Addr &address); // Initiate an asynchronous passive connection virtual int accept (size_t bytes_to_read = 0); // This initiates a new asynchronous accept through the AcceptEx call. private: + +#if defined (ACE_WIN32) +// This only works on Win32 platforms ACE_Asynch_Acceptor<JAWS_IO_Handler> *acceptor_; +#else + void *acceptor_; +#endif /* defined (ACE_WIN32) */ }; -#endif /* defined (ACE_WIN32) */ +typedef ACE_Singleton<JAWS_IO_Synch_Acceptor, ACE_SYNCH_MUTEX> + JAWS_IO_Synch_Acceptor_Singleton; + +typedef ACE_Singleton<JAWS_IO_Asynch_Acceptor, ACE_SYNCH_MUTEX> + JAWS_IO_Asynch_Acceptor_Singleton; #endif /* !defined (JAWS_IO_ACCEPTOR_H) */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.cpp index 52922842dce..baace4f598a 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.cpp +++ b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.cpp @@ -2,6 +2,7 @@ #include "JAWS/IO.h" #include "JAWS/IO_Handler.h" +#include "JAWS/Data_Block.h" JAWS_IO_Handler_Factory::~JAWS_IO_Handler_Factory (void) { @@ -14,7 +15,7 @@ JAWS_Synch_IO_Handler::JAWS_Synch_IO_Handler (JAWS_IO *io, pipeline_ (0), factory_ (factory) { - this->io_.handler (this); + this->io_->handler (this); } JAWS_Synch_IO_Handler::~JAWS_Synch_IO_Handler (void) @@ -39,26 +40,28 @@ JAWS_Synch_IO_Handler::accept_error (void) void JAWS_Synch_IO_Handler::read_complete (ACE_Message_Block &data) { + ACE_UNUSED_ARG (data); // We can call back into the pipeline task at this point - this->pipeline_->read_complete (data); + // this->pipeline_->read_complete (data); } void JAWS_Synch_IO_Handler::read_error (void) { - this->pipeline_->read_error (); + // this->pipeline_->read_error (); } void JAWS_Synch_IO_Handler::transmit_file_complete (void) { - this->pipeline_->transmit_file_complete (); + // this->pipeline_->transmit_file_complete (); } void JAWS_Synch_IO_Handler::transmit_file_error (int result) { - this->pipeline_->transmit_file_complete (result); + ACE_UNUSED_ARG (result); + // this->pipeline_->transmit_file_complete (result); } void @@ -75,8 +78,7 @@ JAWS_Synch_IO_Handler::receive_file_error (int result) void JAWS_Synch_IO_Handler::write_error (void) { - ACE_DEBUG ((LM_DEBUG, " (%t) %s error in writing response\n", - request_.uri ())); + ACE_DEBUG ((LM_DEBUG, " (%t) error in writing response\n")); this->done (); } @@ -100,7 +102,7 @@ JAWS_Synch_IO_Handler::factory (void) void JAWS_Synch_IO_Handler::done (void) { - this->factory()->destroy_http_handler (this, this->io_); + this->factory ()->destroy_io_handler (this, this->io_); } JAWS_IO_Handler * diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.h b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.h index 0957fcad9cf..3c2d3256607 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.h +++ b/apps/JAWS/server/PROTOTYPE/JAWS/IO_Handler.h @@ -222,4 +222,4 @@ public: }; #endif /* ACE_WIN32 */ -#endif /* JAWS_IO_H */ +#endif /* JAWS_IO_HANDLER_H */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Makefile b/apps/JAWS/server/PROTOTYPE/JAWS/Makefile index 77bc7a5c2b2..5bdf1455160 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/Makefile +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Makefile @@ -14,7 +14,10 @@ LIB = libJAWS.a MYFILES = \ Pipeline \ + Data_Block \ + Policy \ Concurrency \ + Server \ IO_Acceptor \ IO_Handler \ IO diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline.h b/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline.h index 747e0469af8..16bcc62cdf5 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline.h +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline.h @@ -14,6 +14,9 @@ typedef ACE_Stream<ACE_NULL_SYNCH> JAWS_Pipeline_Stream; typedef ACE_Module<ACE_NULL_SYNCH> JAWS_Pipeline_Module; typedef ACE_Task<ACE_NULL_SYNCH> JAWS_Pipeline_Task; +class JAWS_IO_Handler; +class JAWS_Dispatch_Policy; + class JAWS_Pipeline : public JAWS_Pipeline_Task // = TITLE // Methods that are common to pipeline components diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.cpp index 63a0c95414f..1774d459022 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.cpp +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.cpp @@ -6,17 +6,18 @@ #include "JAWS/Pipeline_Handler.h" template <class TYPE> -JAWS_Pipeline_Handler<TYPE>::JAWS_Pipeline_Handler (void) +JAWS_Pipeline_Abstract_Handler<TYPE>::JAWS_Pipeline_Abstract_Handler (void) { } template <class TYPE> int -JAWS_Pipeline_Handler<TYPE>::put (ACE_Message_Block *mb, ACE_Time_Value *tv) +JAWS_Pipeline_Abstract_Handler<TYPE>::put (ACE_Message_Block *mb, + ACE_Time_Value *tv) { - TYPE *data = ACE_dynamic_cast (TYPE *, + TYPE *data = ACE_dynamic_cast (TYPE *, mb->data_block ()); - status = this->handle_input (data, tv); + int status = this->handle_put (data, tv); return (status != -1) ? this->put_next (mb, tv) : -1; } diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.h b/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.h index db4786814f6..92eb39cd10d 100644 --- a/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.h +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Pipeline_Handler.h @@ -7,12 +7,12 @@ #include "JAWS/Pipeline.h" template <class TYPE> -class JAWS_Pipeline_Handler : public JAWS_Pipeline_Task +class JAWS_Pipeline_Abstract_Handler : public JAWS_Pipeline_Task // = TITLE // Methods that are common to pipeline components { public: - JAWS_Pipeline_Handler (void); + JAWS_Pipeline_Abstract_Handler (void); // ACE_Task hooks virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Policy.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/Policy.cpp new file mode 100644 index 00000000000..706416256fe --- /dev/null +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Policy.cpp @@ -0,0 +1,15 @@ +// $Id$ + +#include "JAWS/Policy.h" + +JAWS_Concurrency_Base * +JAWS_Synch_Dispatch_Policy::concurrency (void) +{ + return 0; +} + +JAWS_IO * +JAWS_Synch_Dispatch_Policy::io (void) +{ + return &(this->io_); +} diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Policy.h b/apps/JAWS/server/PROTOTYPE/JAWS/Policy.h new file mode 100644 index 00000000000..a637c588779 --- /dev/null +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Policy.h @@ -0,0 +1,25 @@ +/* -*- c++ -*- */ +// $Id$ + +#if !defined (JAWS_POLICY_H) +#define JAWS_POLICY_H + +#include "JAWS/Concurrency.h" + +class JAWS_IO; +class JAWS_IO_Handler; +class JAWS_IO_Handler_Factory; + +class JAWS_Synch_Dispatch_Policy : public JAWS_Dispatch_Policy +{ +public: + virtual JAWS_Concurrency_Base * concurrency (void); + virtual JAWS_IO * io (void); + +private: + JAWS_IO_Handler_Factory *factory_; + JAWS_IO_Handler *ioh_; + JAWS_IO &io_; +}; + +#endif /* !defined (JAWS_POLICY_H) */ diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Server.cpp b/apps/JAWS/server/PROTOTYPE/JAWS/Server.cpp new file mode 100644 index 00000000000..6a03222d299 --- /dev/null +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Server.cpp @@ -0,0 +1,138 @@ +// $Id$ + +#include "ace/Get_Opt.h" + +#include "JAWS/Server.h" +#include "JAWS/Data_Block.h" +#include "JAWS/Concurrency.h" +#include "JAWS/IO_Handler.h" + +JAWS_Server::JAWS_Server (void) + : port_ (5432), + concurrency_ (0), + dispatch_ (0), + nthreads_ (5), + maxthreads_ (20), + flags_ (THR_NEW_LWP) +{ +} + +JAWS_Server::JAWS_Server (int argc, char *argv[]) + : port_ (5432), + concurrency_ (0), + dispatch_ (0), + nthreads_ (5), + maxthreads_ (20), + flags_ (THR_NEW_LWP) +{ + this->init (argc, argv); +} + +void +JAWS_Server::init (int argc, char *argv[]) +{ + this->parse_args (argc, argv); +} + +int +JAWS_Server::open (JAWS_Pipeline_Handler *protocol) +{ + JAWS_Synch_IO_Handler_Factory synch_factory; +#if defined (ACE_WIN32) + JAWS_Asynch_IO_Handler_Factory asynch_factory; +#else + JAWS_Synch_IO_Handler_Factory &asynch_factory = synch_factory; +#endif /* defined (ACE_WIN32) */ + + JAWS_IO_Handler_Factory *factory; + JAWS_IO_Handler *handler; + JAWS_Data_Block *db; + + ACE_INET_Addr inet_addr (this->port_); + + // initialize an IO_Handler + factory = (this->dispatch_ == 0) ? &synch_factory : &asynch_factory; + handler = factory->create_io_handler (); + if (handler == 0) + { + factory->destroy_io_handler (handler, 0); + ACE_DEBUG ((LM_DEBUG, "JAWS_Server::open, can't create handler\n")); + return -1; + } + // handler->task (protocol); + + // initialize data block + db = new JAWS_Data_Block; + if (db == 0) + { + factory->destroy_io_handler (handler, 0); + ACE_DEBUG ((LM_DEBUG, "JAWS_Server::open, can't create data block\n")); + return -1; + } + + db->addr (&inet_addr); + db->handler (handler); + db->task (JAWS_Pipeline_Accept_Task_Singleton::instance ()); + + // The message block should contain an INET_Addr, and call the + // io->accept (INET_Addr) method! + + ACE_Message_Block mb (db); + + JAWS_Concurrency_Base *concurrency; + concurrency = (this->concurrency_ == 0) + ? JAWS_Thread_Pool_Singleton::instance () + : JAWS_Thread_Per_Singleton::instance () + ; + + // concurrency->open (this->flags_, this->nthreads_, this->maxthreads_); + concurrency->put (&mb); + + while (ACE_OS::thr_join (0, NULL) != -1) + ; + + return 0; +} + +void +JAWS_Server::parse_args (int argc, char *argv[]) +{ + int c; + + ACE_Get_Opt getopt (argc, argv, "p:c:d:n:m:f:"); + while ((c = getopt ()) != -1) + switch (c) + { + case 'p': + this->port_ = ACE_OS::atoi (getopt.optarg); + break; + case 'c': + if (ACE_OS::strcmp (getopt.optarg, "PER_REQUEST") == 0) + this->concurrency_ = 1; + else this->concurrency_ = 0; + break; + case 'd': + if (ACE_OS::strcmp (getopt.optarg, "ASYNCH") == 0) + this->dispatch_ = 1; + else this->dispatch_ = 0; + break; + case 'n': + this->nthreads_ = ACE_OS::atoi (getopt.optarg); + break; + case 'm': + this->maxthreads_ = ACE_OS::atoi (getopt.optarg); + break; + case 'f': + if (ACE_OS::strcmp (getopt.optarg, "THR_BOUND") == 0) + this->flags_ |= THR_BOUND; + else if (ACE_OS::strcmp (getopt.optarg, "THR_DAEMON") == 0) + this->flags_ |= THR_DAEMON; + else if (ACE_OS::strcmp (getopt.optarg, "THR_DETACHED") == 0) + this->flags_ |= THR_DETACHED; + break; + } + + if (this->port_ == 0) this->port_ = 5432; + if (this->nthreads_ == 0) this->nthreads_ = 5; + if (this->maxthreads_ == 0) this->maxthreads_ = 20; +} diff --git a/apps/JAWS/server/PROTOTYPE/JAWS/Server.h b/apps/JAWS/server/PROTOTYPE/JAWS/Server.h new file mode 100644 index 00000000000..e2405e4b360 --- /dev/null +++ b/apps/JAWS/server/PROTOTYPE/JAWS/Server.h @@ -0,0 +1,36 @@ +/* -*- c++ -*- */ +// $Id$ + +#if !defined (JAWS_SERVER_H) +#define JAWS_SERVER_H + +#include "JAWS/Data_Block.h" + +class JAWS_IO_Handler_Factory; + +class JAWS_Server +{ +public: + JAWS_Server (void); + JAWS_Server (int argc, char *argv[]); + + void init (int argc, char *argv[]); + int open (JAWS_Pipeline_Handler *ph); + +private: + void parse_args (int argc, char *argv[]); + // Parse arguments + +private: + int port_; // port to listen on + int concurrency_; // 0 => pool, 1 => per request + int dispatch_; // 0 => synch, 1 => asynch + int nthreads_; // number of threads + int maxthreads_; // maximum number of threads + long flags_; // thread creation flags + + JAWS_IO_Handler_Factory *factory_; +}; + + +#endif /* !defined (JAWS_SERVER_H) */ |