diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-01 08:00:34 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-01-01 08:00:34 +0000 |
commit | ea0d28240863caf437a18071bfd03e7b146c5ade (patch) | |
tree | 91b695852b885a5f44f9be8c3a22bbf7f5a96b8d | |
parent | a6e2ced2f5279e011b712995095a1712a29e22f0 (diff) | |
download | ATCD-ea0d28240863caf437a18071bfd03e7b146c5ade.tar.gz |
foo
46 files changed, 1569 insertions, 919 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b index 349f954ba24..64de3de46b5 100644 --- a/ChangeLog-96b +++ b/ChangeLog-96b @@ -1,3 +1,60 @@ +Wed Jan 1 00:10:47 1997 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * apps/Gateway/Gateway: Moved all of the configuration file + parsing logic *outside* of the Event_Channel into the Gateway + class so that we wouldn't have unnecessary dependencies. + + * apps/Gateway/Gateway: Redesigned the Gateway so that the + Proxy_Handlers (i.e., the Consumer_Proxy and Supplier_Proxy) + most of their work to the Event_Channel. This "lightweight + proxy" design is an improvement since it is now possible to + emulate the COS Event Channel semantics within the Event_Channel + "kernel." + + * Happy new year! + +Tue Dec 31 18:27:50 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * ace/Log_Msg.cpp (log): Added a test so that if we're + (1) not printing to stderr and (2) aborting the program we still + print a message to stderr. + + * ace/Message_Block: Added synchronization support to + ACE_Message_Block. This is necessary now that we've got + reference counting to ensure that we don't have race conditions + when incrementing and decrementing the reference count in + separate threads. The approach is very clean and uses the new + ACE_Lock mechanism to conditionally acquire()/release() the + locking strategy if concurrency control is necessary. + + * ace/Synch_T: Created a new set of ACE_Lock and + ACE_Lock_Adapter<> classes which are similar in spirit to the + ACE_Allocator and ACE_Allocator_Adapter<> classes. These make + it possible to treat polymorphically synchronization mechanisms + in ACE polymorphically, *without* creating an entire new + parallel hierarchy of locking mechanisms. + + * ace/Synch: Added the full suite of acquire_{read|write}() and + tryacquire_{read|write}() methods to ACE_Semaphore and + ACE_Process_Semaphore so they will be consist with the other + synchronization APIs. + +Tue Dec 31 00:11:56 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> + + * Changed all uses of ACE_Event_Handler::RWE_MASK to + ACE_Event_Handler::ALL_EVENTS_MASK to reflect the fact that + we will soon have more than READ, WRITE, and EXCEPT events. + However, I've kept RWE_MASK around for backwards + compatibility. + + * examples/ASX/Message_Queue: Changed the tests so that they use + the new ACE_Message_Block::release() method rather than calling + delete explicitly. + + * apps/Gateway: Revised the implementation of the Gateway and Peer + applications to take advantage of the new ACE_Message_Block + reference counting scheme. + Tue Dec 31 15:06:51 1996 David L. Levine <levine@cs.wustl.edu> * ace/Task.cpp: added comments that try to explain interaction @@ -12,6 +69,15 @@ Tue Dec 31 15:06:51 1996 David L. Levine <levine@cs.wustl.edu> Mon Dec 30 15:24:59 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> + * ace/Message_Block: Added reference counting to ACE_Message_Block + so that we no longer have to clone() messages when we want to + pass them around "by reference." + + * apps/Gateway/Peer/Peer.cpp (init): The Peer_Acceptor had gotten + out of date wrt newer ACE features, so I updated it. + +Mon Dec 30 15:24:59 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> + * ace/OS.h: Added a special case for ACE_UNUSED_ARG that works with G++. Thanks to David Levine for this. diff --git a/ace/Acceptor.h b/ace/Acceptor.h index 7071af03800..feca8eb7d2f 100644 --- a/ace/Acceptor.h +++ b/ace/Acceptor.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -109,7 +108,7 @@ protected: // = Demultiplexing hooks. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Perform termination activities when <this> is removed from the // <reactor>. @@ -252,7 +251,7 @@ protected: // Returns the listening acceptor's <ACE_HANDLE>. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Perform termination activities when <this> is removed from the // <Reactor>. @@ -397,7 +396,7 @@ protected: // Returns the listening acceptor's <ACE_HANDLE>. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Perform termination activities when <this> is removed from the // <reactor>. diff --git a/ace/Connector.cpp b/ace/Connector.cpp index 0cace4e72c3..03fa643dfc8 100644 --- a/ace/Connector.cpp +++ b/ace/Connector.cpp @@ -268,7 +268,7 @@ ACE_Connector<SH, PR_CO_2>::cleanup_AST (ACE_HANDLE handle, this->reactor_->cancel_timer (ast->cancellation_id ()); // Remove ACE_HANDLE from ACE_Reactor. - this->reactor_->remove_handler (handle, ACE_Event_Handler::RWE_MASK + this->reactor_->remove_handler (handle, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); // Remove ACE_HANDLE from the map. diff --git a/ace/Connector.h b/ace/Connector.h index 8d9e7d44a5c..02a193621b4 100644 --- a/ace/Connector.h +++ b/ace/Connector.h @@ -222,7 +222,7 @@ protected: // = Demultiplexing hooks. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Terminate the Client ACE_Connector by iterating over any // unconnected ACE_Svc_Handler's and removing them from the // ACE_Reactor. diff --git a/ace/Event_Handler.h b/ace/Event_Handler.h index 07140a82ad6..f414d930ed4 100644 --- a/ace/Event_Handler.h +++ b/ace/Event_Handler.h @@ -54,8 +54,8 @@ public: EXCEPT_MASK = 0x2, #endif /* ACE_USE_POLL */ ACCEPT_MASK = 0x8, - ALL_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK, - RWE_MASK = ALL_MASK, + ALL_EVENTS_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK, + RWE_MASK = ALL_EVENTS_MASK, DONT_CALL = 0x100 }; diff --git a/ace/Log_Msg.cpp b/ace/Log_Msg.cpp index e0bbef3abfe..8c509eee15d 100644 --- a/ace/Log_Msg.cpp +++ b/ace/Log_Msg.cpp @@ -690,9 +690,11 @@ ACE_Log_Msg::log (const char *format_str, if (abort_prog) { - // _always_ print a message to stderr if aborting, not verbose - // to help avoid recursive aborts if something is hosed - log_record.print (ACE_Log_Msg::local_host_, 0); + // *always* print a message to stderr if we're aborting (and + // have not already done so). We don't use verbose, however, to + // avoid recursive aborts if something is hosed. + if (!ACE_BIT_ENABLED (ACE_Log_Msg::flags_, ACE_Log_Msg::STDERR)) + log_record.print (ACE_Log_Msg::local_host_, 0); ACE_OS::exit (exit_value); } diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp index 9f9503ef429..d58e13a8c03 100644 --- a/ace/Message_Block.cpp +++ b/ace/Message_Block.cpp @@ -80,7 +80,10 @@ ACE_Message_Block::ACE_Message_Block (void) cont_ (0), next_ (0), prev_ (0), - allocator_ (0) + allocator_strategy_ (0), + locking_strategy_ (0), + reference_count_ (1) + { ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); } @@ -89,20 +92,24 @@ ACE_Message_Block::ACE_Message_Block (size_t sz, ACE_Message_Type msg_type, ACE_Message_Block *msg_cont, const char *msg_data, - ACE_Allocator *alloc) + ACE_Allocator *allocator_strategy, + ACE_Lock *locking_strategy) { ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); - if (this->init (sz, msg_type, msg_cont, msg_data, alloc) == -1) + + if (this->init (sz, msg_type, msg_cont, msg_data, + allocator_strategy, locking_strategy) == -1) ACE_ERROR ((LM_ERROR, "ACE_Message_Block")); } ACE_Message_Block::~ACE_Message_Block (void) { ACE_TRACE ("ACE_Message_Block::~ACE_Message_Block"); + if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE)) { - if (this->allocator_) - this->allocator_->free ((void *) this->base_); + if (this->allocator_strategy_) + this->allocator_strategy_->free ((void *) this->base_); else delete [] this->base_; } @@ -125,7 +132,9 @@ ACE_Message_Block::ACE_Message_Block (const char *data, cont_ (0), next_ (0), prev_ (0), - allocator_ (0) + allocator_strategy_ (0), + locking_strategy_ (0), + reference_count_ (1) { ACE_TRACE ("ACE_Message_Block::ACE_Message_Block"); } @@ -134,6 +143,7 @@ int ACE_Message_Block::size (size_t length) { ACE_TRACE ("ACE_Message_Block::size"); + if (length < this->max_size_) this->cur_size_ = length; else @@ -141,11 +151,11 @@ ACE_Message_Block::size (size_t length) int r_delta, w_delta; char *buf; - if (this->allocator_ == 0) + if (this->allocator_strategy_ == 0) ACE_NEW_RETURN (buf, char[length], -1); else // Use the allocator! { - buf = (char *) this->allocator_->malloc (length); + buf = (char *) this->allocator_strategy_->malloc (length); if (buf == 0) { errno = ENOMEM; @@ -155,8 +165,8 @@ ACE_Message_Block::size (size_t length) if (ACE_BIT_DISABLED (this->flags_, ACE_Message_Block::DONT_DELETE)) { - if (this->allocator_) - this->allocator_->free ((void *) this->base_); + if (this->allocator_strategy_) + this->allocator_strategy_->free ((void *) this->base_); else delete [] this->base_; } @@ -183,6 +193,7 @@ ACE_Message_Block::init (const char *data, size_t size) { ACE_TRACE ("ACE_Message_Block::init"); + // Should we also initialize all the other fields, as well? this->base_ = (char *) data; this->cur_size_ = size; this->max_size_ = size; @@ -195,22 +206,23 @@ ACE_Message_Block::init (size_t sz, ACE_Message_Type msg_type, ACE_Message_Block *msg_cont, const char *msg_data, - ACE_Allocator *alloc) + ACE_Allocator *allocator_strategy, + ACE_Lock *locking_strategy) { ACE_TRACE ("ACE_Message_Block::init"); this->flags_ = 0; if (msg_data == 0) { - if (alloc == 0) + if (allocator_strategy == 0) { - this->allocator_ = 0; + this->allocator_strategy_ = 0; ACE_NEW_RETURN (this->base_, char[sz], -1); } else // Use the allocator! { - this->allocator_ = alloc; - this->base_ = (char *) alloc->malloc (sz); + this->allocator_strategy_ = allocator_strategy; + this->base_ = (char *) this->allocator_strategy_->malloc (sz); if (this->base_ == 0) { errno = ENOMEM; @@ -233,6 +245,8 @@ ACE_Message_Block::init (size_t sz, this->cont_ = msg_cont; this->next_ = 0; this->prev_ = 0; + this->locking_strategy_ = locking_strategy; + this->reference_count_ = 1; return 0; } @@ -248,7 +262,7 @@ ACE_Message_Block::clone (Message_Flags mask) const ACE_NEW_RETURN (nb, ACE_Message_Block (this->max_size_, this->type_, - 0, 0, this->allocator_), + 0, 0, this->allocator_strategy_), 0); ACE_OS::memcpy (nb->base_, this->base_, this->max_size_); @@ -264,3 +278,59 @@ ACE_Message_Block::clone (Message_Flags mask) const nb->cont_ = this->cont_->clone (mask); return nb; } + +ACE_Message_Block * +ACE_Message_Block::release (void) +{ + ACE_TRACE ("ACE_Message_Block::release"); + + ACE_Message_Block *result = 0; + + if (this->locking_strategy_) + { + // We need to acquire the lock before decrementing the count. + this->locking_strategy_->acquire (); + this->reference_count_--; + + if (this->reference_count_ == 0) + delete this; + else if (this->reference_count_ > 0) + result = this; + // ACE_ASSERT (this->reference_count_ <= 0) + // This shouldn't happen... + + this->locking_strategy_->release (); + } + else + { + this->reference_count_--; + + if (this->reference_count_ == 0) + delete this; + else if (this->reference_count_ > 0) + result = this; + // ACE_ASSERT (this->reference_count_ <= 0) + // This shouldn't happen... + } + + return result; +} + +ACE_INLINE ACE_Message_Block * +ACE_Message_Block::duplicate (void) +{ + ACE_TRACE ("ACE_Message_Block::duplicate"); + + if (this->locking_strategy_) + { + // We need to acquire the lock before incrementing the count. + this->locking_strategy_->acquire (); + this->reference_count_++; + this->locking_strategy_->release (); + } + else + this->reference_count_++; + + return this; +} + diff --git a/ace/Message_Block.h b/ace/Message_Block.h index 599e1039813..30852f4dc8c 100644 --- a/ace/Message_Block.h +++ b/ace/Message_Block.h @@ -23,17 +23,21 @@ class ACE_Export ACE_Message_Block // = TITLE - // Object used to store messages in the ASX framework. + // Stores messages for use throughout ACE (particularly + // <ACE_Message_Queue>). // // = DESCRIPTION - // An ACE_Message_Block is modeled after the message data - // structures used in System V STREAMS. A Message_Block is - // composed of one or more Message_Blocks that are linked - // together by PREV and NEXT pointers. In addition, a - // ACE_Message_Block may also be linked to a chain of other - // Message_Blocks. This structure enables efficient + // An <ACE_Message_Block> is modeled after the message data + // structures used in System V STREAMS. An <ACE_Message_Block> + // is composed of one or more <ACE_Message_Blocks> that can be + // linked to form a ``fragment chain.'' In addition, + // <ACE_Message_Blocks> can be linked together by <prev_> and + // <next_> pointers to form a queue of messages (this is how + // <ACE_Message_Queue> works). This structure enables efficient // manipulation of arbitrarily-large messages *without* - // incurring memory copying overhead. + // incurring memory copying overhead since (1) + // <ACE_Message_Blocks> can be chained together via pointers and + // (2) <ACE_Message_Blocks> keep a reference count. { public: enum ACE_Message_Type @@ -90,13 +94,16 @@ public: ACE_Message_Type type = MB_DATA, ACE_Message_Block *cont = 0, const char *data = 0, - ACE_Allocator *allocator = 0); + ACE_Allocator *allocator_strategy_ = 0, + ACE_Lock *locking_strategy = 0); // Create an initialized message of type <type> containing <size> // bytes. The <cont> argument initializes the continuation field in // the <Message_Block>. If <data> == 0 then we create and own the // <data>, using <allocator> to get the data if it's non-0. If // <data> != 0 we assume ownership of the <data> (and don't delete - // it). + // it). If <locking_strategy> is non-0 then this is used to protect + // regions of code that access shared state (e.g., reference + // counting) from race conditions. int init (const char *data, size_t size = 0); @@ -107,13 +114,16 @@ public: ACE_Message_Type type = MB_DATA, ACE_Message_Block *cont = 0, const char *data = 0, - ACE_Allocator *allocator = 0); + ACE_Allocator *allocator = 0, + ACE_Lock *locking_strategy = 0); // Create an initialized message of type <type> containing <size> // bytes. The <cont> argument initializes the continuation field in // the <Message_Block>. If <data> == 0 then we create and own the // <data>, using <allocator> to get the data if it's non-0. If // <data> != 0 we assume ownership of the <data> (and don't delete - // it). + // it). If <locking_strategy> is non-0 then this is used to protect + // regions of code that access shared state (e.g., reference + // counting) from race conditions. ~ACE_Message_Block (void); // Delete all the resources held in the message. @@ -145,6 +155,16 @@ public: ACE_Message_Block *clone (Message_Flags mask = ACE_Message_Block::DONT_DELETE) const; // Return an exact "deep copy" of the message. + // = Reference counting methods. + ACE_Message_Block *duplicate (void); + // Increment our reference count by one. + + ACE_Message_Block *release (void); + // Decrement our reference count by one. If the reference count is + // > 0 then return this; else if reference count == 0 then delete + // <this> and return 0. Behavior is undefined if reference count < + // 0. + // = Operations on Message data int copy (const char *buf, size_t n); @@ -160,7 +180,9 @@ public: char *base (void) const; // Get message data. - void base (char *data, size_t size, Message_Flags = DONT_DELETE); + void base (char *data, + size_t size, + Message_Flags = DONT_DELETE); // Set message data. char *end (void) const; @@ -180,42 +202,43 @@ public: void wr_ptr (size_t n); // Set the write pointer ahead <n> bytes. - // = The length of a message is computed as the length between the - // wr_ptr() - rd_ptr ()). + // = Message length is wr_ptr() - rd_ptr (). size_t length (void) const; // Get the length of the message void length (size_t n); // Set the length of the message - // = The size of the allocated buffer is the total amount of space - // alloted. + // = Message size is the total amount of space alloted. size_t size (void) const; // Get the total amount of space in the message. int size (size_t length); // Set the total amount of space in the message. Returns 0 if // successful, else -1. - // = The coninuation field is used to chain together composite - // messages. + // = The continuation field chains together composite messages. ACE_Message_Block *cont (void) const; // Get the continuation field. void cont (ACE_Message_Block *); // Set the continuation field. - // = The <next_> pointer points to the <Message_Block> directly ahead - // in the Message_Queue. + // = The <next_> pointer is a link to the <Message_Block> directly ahead in the Message_Queue. ACE_Message_Block *next (void) const; // Get link to next message. void next (ACE_Message_Block *); // Set link to next message. - // = The <prev_> pointer points to the <Message_Block> directly - // ahead in the Message_Queue. + // = The <prev_> pointer is a link to the <Message_Block> directly ahead in the Message_Queue. ACE_Message_Block *prev (void) const; // Get link to prev message. void prev (ACE_Message_Block *); // Set link to prev message. + // = The locking strategy prevents race condition. + ACE_Lock *locking_strategy (void); + // Get the locking strategy. + ACE_Lock *locking_strategy (ACE_Lock *); + // Set a new locking strategy and return the hold one. + void dump (void) const; // Dump the state of an object. @@ -224,7 +247,7 @@ public: private: Message_Flags flags_; - // Misc flags. + // Misc flags (e.g., DONT_DELETE and USER_FLAGS). char *base_; // Pointer to beginning of message block. @@ -257,9 +280,17 @@ private: ACE_Message_Block *prev_; // Pointer to previous message in the list. - ACE_Allocator *allocator_; + ACE_Allocator *allocator_strategy_; // Pointer to the allocator defined for this message block. + ACE_Lock *locking_strategy_; + // Pointer to the locking defined for this message block. This is + // used to protect regions of code containing + + size_t reference_count_; + // Reference count for this <Message_Block> which is used to avoid + // deep copies (i.e., <clone>). + // = Disallow these operations for now (use <clone> instead). ACE_Message_Block &operator= (const ACE_Message_Block &); ACE_Message_Block (const ACE_Message_Block &); diff --git a/ace/Message_Block.i b/ace/Message_Block.i index eebe94937d6..72b4f42f67b 100644 --- a/ace/Message_Block.i +++ b/ace/Message_Block.i @@ -230,3 +230,19 @@ ACE_Message_Block::prev (void) const return this->prev_; } +ACE_INLINE ACE_Lock * +ACE_Message_Block::locking_strategy (void) +{ + ACE_TRACE ("ACE_Message_Block::locking_strategy"); + return this->locking_strategy_; +} + +ACE_INLINE ACE_Lock * +ACE_Message_Block::locking_strategy (ACE_Lock *nls) +{ + ACE_TRACE ("ACE_Message_Block::locking_strategy"); + ACE_Lock *ols = this->locking_strategy_; + this->locking_strategy_ = nls; + return ols; +} + diff --git a/ace/Reactor.cpp b/ace/Reactor.cpp index 1a0a9d8377a..c7a629d028b 100644 --- a/ace/Reactor.cpp +++ b/ace/Reactor.cpp @@ -92,7 +92,7 @@ ACE_Handler_Repository::close (ACE_Reactor *reactor) i < this->cur_size_; i++) reactor->detach (this->event_handlers_[i].handle_, - ACE_Event_Handler::RWE_MASK); + ACE_Event_Handler::ALL_EVENTS_MASK); delete [] this->event_handlers_; this->event_handlers_ = 0; @@ -100,7 +100,7 @@ ACE_Handler_Repository::close (ACE_Reactor *reactor) for (ACE_HANDLE h = 0; h < this->max_handlep1_; h++) - reactor->detach (h, ACE_Event_Handler::RWE_MASK); + reactor->detach (h, ACE_Event_Handler::ALL_EVENTS_MASK); delete [] this->event_handlers_; this->event_handlers_ = 0; @@ -1591,7 +1591,7 @@ ACE_Reactor::check_handles (void) if (ACE_OS::poll (&p_handle, 1, 0) == -1) { result = 1; - this->detach (handle, ACE_Event_Handler::RWE_MASK); + this->detach (handle, ACE_Event_Handler::ALL_EVENTS_MASK); } #else rmask.set_bit (handle); @@ -1600,7 +1600,7 @@ ACE_Reactor::check_handles (void) &time_poll) < 0) { result = 1; - this->detach (handle, ACE_Event_Handler::RWE_MASK); + this->detach (handle, ACE_Event_Handler::ALL_EVENTS_MASK); } rmask.clr_bit (handle); #endif /* ACE_USE_POLL */ diff --git a/ace/SOCK_Acceptor.h b/ace/SOCK_Acceptor.h index fd3c2534a92..fed78c498ab 100644 --- a/ace/SOCK_Acceptor.h +++ b/ace/SOCK_Acceptor.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY diff --git a/ace/Svc_Handler.h b/ace/Svc_Handler.h index 1228ad0964a..89bf2d73d99 100644 --- a/ace/Svc_Handler.h +++ b/ace/Svc_Handler.h @@ -73,7 +73,7 @@ public: // = Demultiplexing hooks. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Perform termination activities on the SVC_HANDLER. The default // behavior is to close down the <peer_> (to avoid descriptor leaks) // and to delete this (to avoid memory leaks)! If you don't want diff --git a/ace/Synch.h b/ace/Synch.h index 6d3e864d582..3a88bee5a32 100644 --- a/ace/Synch.h +++ b/ace/Synch.h @@ -29,6 +29,54 @@ class ACE_Time_Value; // template <class ACE_COND_MUTEX> class ACE_Condition; +class ACE_Lock + // = TITLE + // This is the abstract base class that contains the uniform + // locking API that is supported by all the ACE synchronization + // mechanisms. + // + // = DESCRIPTION + // This class is typically used in conjunction with the + // <ACE_Lock_Adapter> in order to provide a polymorphic + // interface to the ACE synchronization mechanisms (e.g., + // <ACE_Mutex>, <ACE_Semaphore>, <ACE_RW_Lock>, etc). Note that + // the reason that all of ACE doesn't use polymorphic locks is + // that (1) they add ~20% extra overhead for virtual function + // calls and (2) objects with virtual functions can't be placed + // into shared memory. +{ +public: + virtual int remove (void) = 0; + // Explicitly destroy the lock. + + virtual int acquire (void) = 0; + // Block the thread until the lock is acquired. + + virtual int tryacquire (void) = 0; + // Conditionally acquire the lock (i.e., won't block). + + virtual int release (void) = 0; + // Release the lock. + + virtual int acquire_read (void) = 0; + // Block until the thread acquires a read lock. If the locking + // mechanism doesn't support read locks then this just calls + // <acquire>. + + virtual int acquire_write (void) = 0; + // Block until the thread acquires a write lock. If the locking + // mechanism doesn't support read locks then this just calls + // <acquire>. + + virtual int tryacquire_read (void) = 0; + // Conditionally acquire a read lock. If the locking mechanism + // doesn't support read locks then this just calls <acquire>. + + virtual int tryacquire_write (void) = 0; + // Conditionally acquire a write lock. If the locking mechanism + // doesn't support read locks then this just calls <acquire>. +}; + class ACE_Export ACE_File_Lock // = TITLE // A wrapper around the UNIX file locking mechanism. @@ -111,12 +159,12 @@ class ACE_Export ACE_Semaphore { public: // = Initialization and termination. - ACE_Semaphore (u_int count, + ACE_Semaphore (u_int count = 1, // By default make this unlocked. int type = USYNC_THREAD, LPCTSTR name = 0, void * = 0, int max = 0x7fffffff); - // Initialize the semaphore, with default value of "count". + // Initialize the semaphore, with initial value of "count". ~ACE_Semaphore (void); // Implicitly destroy the semaphore. @@ -129,13 +177,33 @@ public: // greater than 0, then decrement it. int tryacquire (void); - // Conditionally decrement the semaphore if count is greater - // than 0 (i.e., won't block). + // Conditionally decrement the semaphore if count is greater than 0 + // (i.e., won't block). int release (void); // Increment the semaphore, potentially unblocking // a waiting thread. + int acquire_read (void); + // Acquire semaphore ownership. This calls <acquire> and is only + // here to make the <ACE_Semaphore> interface consistent with the + // other synchronization APIs. + + int acquire_write (void); + // Acquire semaphore ownership. This calls <acquire> and is only + // here to make the <ACE_Semaphore> interface consistent with the + // other synchronization APIs. + + int tryacquire_read (void); + // Conditionally acquire semaphore (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Semaphore> + // interface consistent with the other synchronization APIs. + + int tryacquire_write (void); + // Conditionally acquire semaphore (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Semaphore> + // interface consistent with the other synchronization APIs. + void dump (void) const; // Dump the state of an object. @@ -159,8 +227,10 @@ class ACE_Export ACE_Process_Semaphore // across processes. { public: - ACE_Process_Semaphore (u_int count, LPCTSTR name = 0, - void * = 0, int max = 0x7FFFFFFF); + ACE_Process_Semaphore (u_int count = 1, // By default make this unlocked. + LPCTSTR name = 0, + void * = 0, + int max = 0x7FFFFFFF); // Initialize the semaphore, with an initial value of <count> and a // maximum value of <max>. @@ -171,16 +241,35 @@ public: // Explicitly destroy the semaphore. int acquire (void); - // Block the thread until the semaphore count becomes - // greater than 0, then decrement it. + // Block the thread until the semaphore count becomes greater than + // 0, then decrement it. int tryacquire (void); - // Conditionally decrement the semaphore if count is greater - // than 0 (i.e., won't block). + // Conditionally decrement the semaphore if count is greater than 0 + // (i.e., won't block). int release (void); - // Increment the semaphore, potentially unblocking - // a waiting thread. + // Increment the semaphore, potentially unblocking a waiting thread. + + int acquire_read (void); + // Acquire semaphore ownership. This calls <acquire> and is only + // here to make the <ACE_Process_Semaphore> interface consistent + // with the other synchronization APIs. + + int acquire_write (void); + // Acquire semaphore ownership. This calls <acquire> and is only + // here to make the <ACE_Process_Semaphore> interface consistent + // with the other synchronization APIs. + + int tryacquire_read (void); + // Conditionally acquire semaphore (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Process_Semaphore> + // interface consistent with the other synchronization APIs. + + int tryacquire_write (void); + // Conditionally acquire semaphore (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Process_Semaphore> + // interface consistent with the other synchronization APIs. void dump (void) const; // Dump the state of an object. @@ -233,12 +322,12 @@ public: int acquire (void); // Note, for interface uniformity with other synchronization // wrappers we include the <acquire> method. This is implemented as - // a write-lock to be on the safe-side... + // a write-lock to safe... int tryacquire (void); // Note, for interface uniformity with other synchronization // wrappers we include the <tryacquire> method. This is implemented - // as a write-lock to be on the safe-side... + // as a write-lock to be safe... int release (void); // Unlock a readers/writer lock. @@ -263,8 +352,8 @@ private: class ACE_Export ACE_Mutex // = TITLE - // ACE_Mutex wrapper (valid in same process or across processes - // (depending on TYPE flag)) + // <ACE_Mutex> wrapper (valid in same process or across + // processes (depending on TYPE flag)). { public: ACE_Mutex (int type = USYNC_THREAD, @@ -288,16 +377,24 @@ public: // Release lock and unblock a thread at head of priority queue. int acquire_read (void); - // Acquire lock ownership (wait on priority queue if necessary). + // Acquire mutex ownership. This calls <acquire> and is only + // here to make the <ACE_Mutex> interface consistent with the + // other synchronization APIs. int acquire_write (void); - // Acquire lock ownership (wait on priority queue if necessary). + // Acquire mutex ownership. This calls <acquire> and is only + // here to make the <ACE_Mutex> interface consistent with the + // other synchronization APIs. int tryacquire_read (void); - // Conditionally acquire a lock (i.e., won't block). + // Conditionally acquire mutex (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Mutex> + // interface consistent with the other synchronization APIs. int tryacquire_write (void); - // Conditionally acquire a lock (i.e., won't block). + // Conditionally acquire mutex (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Mutex> + // interface consistent with the other synchronization APIs. const ACE_mutex_t &lock (void) const; // Return the underlying mutex. @@ -324,7 +421,8 @@ class ACE_Export ACE_Process_Mutex // processes). { public: - ACE_Process_Mutex (LPCTSTR name = ACE_DEFAULT_MUTEX, void *arg = 0); + ACE_Process_Mutex (LPCTSTR name = ACE_DEFAULT_MUTEX, + void *arg = 0); // Create a Process_Mutex, passing in the optional <name>. ~ACE_Process_Mutex (void); @@ -669,16 +767,24 @@ public: // Release lock and unblock a thread at head of priority queue. int acquire_read (void); - // Acquire lock ownership (wait on priority queue if necessary). + // Acquire mutex ownership. This calls <acquire> and is only here + // to make the <ACE_Thread_Mutex> interface consistent with the + // other synchronization APIs. int acquire_write (void); - // Acquire lock ownership (wait on priority queue if necessary). + // Acquire mutex ownership. This calls <acquire> and is only here + // to make the <ACE_Thread_Mutex> interface consistent with the + // other synchronization APIs. int tryacquire_read (void); - // Conditionally acquire a lock (i.e., won't block). + // Conditionally acquire mutex (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Thread_Mutex> + // interface consistent with the other synchronization APIs. int tryacquire_write (void); - // Conditionally acquire a lock (i.e., won't block). + // Conditionally acquire mutex (i.e., won't block). This calls + // <tryacquire> and is only here to make the <ACE_Thread_Mutex> + // interface consistent with the other synchronization APIs. const ACE_thread_mutex_t &lock (void) const; // Return the underlying mutex. @@ -912,8 +1018,10 @@ class ACE_Export ACE_Thread_Semaphore : public ACE_Semaphore // only within on process. { public: - ACE_Thread_Semaphore (u_int count, LPCTSTR name = 0, - void * = 0, int max = 0x7FFFFFFF); + ACE_Thread_Semaphore (u_int count = 1, // By default make this unlocked. + LPCTSTR name = 0, + void * = 0, + int max = 0x7FFFFFFF); // Initialize the semaphore, with an initial value of <count> and a // maximum value of <max>. diff --git a/ace/Synch.i b/ace/Synch.i index 3bc15d21e39..12b4260c4ef 100644 --- a/ace/Synch.i +++ b/ace/Synch.i @@ -235,6 +235,86 @@ ACE_Semaphore::release (void) return ACE_OS::sema_post (&this->semaphore_); } +// Acquire semaphore ownership. This calls <acquire> and is only +// here to make the <ACE_Semaphore> interface consistent with the +// other synchronization APIs. + +ACE_INLINE int +ACE_Semaphore::acquire_read (void) +{ + return this->acquire (); +} + +// Acquire semaphore ownership. This calls <acquire> and is only +// here to make the <ACE_Semaphore> interface consistent with the +// other synchronization APIs. + +ACE_INLINE int +ACE_Semaphore::acquire_write (void) +{ + return this->acquire (); +} + +// Conditionally acquire semaphore (i.e., won't block). This calls +// <tryacquire> and is only here to make the <ACE_Semaphore> +// interface consistent with the other synchronization APIs. + +ACE_INLINE int +ACE_Semaphore::tryacquire_read (void) +{ + return this->tryacquire (); +} + +// Conditionally acquire semaphore (i.e., won't block). This calls +// <tryacquire> and is only here to make the <ACE_Semaphore> +// interface consistent with the other synchronization APIs. + +ACE_INLINE int +ACE_Semaphore::tryacquire_write (void) +{ + return this->tryacquire (); +} + +// Acquire semaphore ownership. This calls <acquire> and is only here +// to make the <ACE_Process_Semaphore> interface consistent with the +// other synchronization APIs. + +ACE_INLINE int +ACE_Process_Semaphore::acquire_read (void) +{ + return this->acquire (); +} + +// Acquire semaphore ownership. This calls <acquire> and is only here +// to make the <ACE_Process_Semaphore> interface consistent with the +// other synchronization APIs. + +ACE_INLINE int +ACE_Process_Semaphore::acquire_write (void) +{ + return this->acquire (); +} + +// Conditionally acquire semaphore (i.e., won't block). This calls +// <tryacquire> and is only here to make the <ACE_Process_Semaphore> +// interface consistent with the other synchronization APIs. + +ACE_INLINE int +ACE_Process_Semaphore::tryacquire_read (void) +{ + return this->tryacquire (); +} + +// Conditionally acquire semaphore (i.e., won't block). This calls +// <tryacquire> and is only here to make the <ACE_Process_Semaphore> +// interface consistent with the other synchronization APIs. + +ACE_INLINE int +ACE_Process_Semaphore::tryacquire_write (void) +{ + return this->tryacquire (); +} + #if defined (ACE_HAS_THREADS) ACE_INLINE const ACE_thread_mutex_t & diff --git a/ace/Synch_T.h b/ace/Synch_T.h index 167bf36950a..ee3ab76422e 100644 --- a/ace/Synch_T.h +++ b/ace/Synch_T.h @@ -23,6 +23,57 @@ // Forward decl class ACE_Time_Value; +template <class LOCKING_MECHANISM> +class ACE_Lock_Adapter : public ACE_Lock + // = TITLE + + // This is an adapter that allows applications to transparently + // combine the <ACE_Lock> abstract base class (which contains + // pure virtual methods) with any of the other concrete ACE + // synchronization classes (e.g., <ACE_Mutex>, <ACE_Semaphore>, + // <ACE_RW_Lock>, etc.). + // + // = DESCRIPTION + // This class uses a form of the Adapter pattern. +{ +public: + typedef LOCKING_MECHANISM LOCK; + + virtual int remove (void); + // Explicitly destroy the lock. + + virtual int acquire (void); + // Block the thread until the lock is acquired. + + virtual int tryacquire (void); + // Conditionally acquire the lock (i.e., won't block). + + virtual int release (void); + // Release the lock. + + virtual int acquire_read (void); + // Block until the thread acquires a read lock. If the locking + // mechanism doesn't support read locks then this just calls + // <acquire>. + + virtual int acquire_write (void); + // Block until the thread acquires a write lock. If the locking + // mechanism doesn't support read locks then this just calls + // <acquire>. + + virtual int tryacquire_read (void); + // Conditionally acquire a read lock. If the locking mechanism + // doesn't support read locks then this just calls <acquire>. + + virtual int tryacquire_write (void); + // Conditionally acquire a write lock. If the locking mechanism + // doesn't support read locks then this just calls <acquire>. + +private: + LOCKING_MECHANISM lock_; + // The concrete locking mechanism that all the methods delegate to. +}; + template <class LOCK, class TYPE> class ACE_Test_and_Set : public ACE_Event_Handler { diff --git a/ace/Synch_T.i b/ace/Synch_T.i index 198975ee38a..252b90633f1 100644 --- a/ace/Synch_T.i +++ b/ace/Synch_T.i @@ -199,4 +199,72 @@ ACE_TSS<TYPE>::ts_get (void) const return (TYPE *) &this->type_; } +// Explicitly destroy the lock. +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::remove (void) +{ + return this->lock_.remove (); +} + +// Block the thread until the lock is acquired. +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire (void) +{ + return this->lock_.acquire (); +} + +// Conditionally acquire the lock (i.e., won't block). + +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire (void) +{ + return this->lock_.tryacquire (); +} + +// Release the lock. + +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::release (void) +{ + return this->lock_.release (); +} + +// Block until the thread acquires a read lock. If the locking +// mechanism doesn't support read locks then this just calls +// <acquire>. + +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_read (void) +{ + return this->lock_.acquire_read (); +} + +// Block until the thread acquires a write lock. If the locking +// mechanism doesn't support read locks then this just calls +// <acquire>. + +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void) +{ + return this->lock_.acquire_write (); +} + +// Conditionally acquire a read lock. If the locking mechanism +// doesn't support read locks then this just calls <acquire>. + +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::tryacquire_read (void) +{ + return this->lock_.tryacquire_read (); +} + +// Conditionally acquire a write lock. If the locking mechanism +// doesn't support read locks then this just calls <acquire>. + +template <class LOCKING_MECHANISM> int +ACE_Lock_Adapter<LOCKING_MECHANISM>::acquire_write (void) +{ + return this->lock_.acquire_write (); +} + #endif /* defined (ACE_HAS_THREADS) && defined (ACE_HAS_THREAD_SPECIFIC_STORAGE) */ diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h index 8d1b2979a49..28e59a4b2e6 100644 --- a/apps/Gateway/Gateway/Concurrency_Strategies.h +++ b/apps/Gateway/Gateway/Concurrency_Strategies.h @@ -55,20 +55,20 @@ class Supplier_Proxy; #if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) #if defined (USE_OUTPUT_MT) -typedef Thr_Consumer_Proxy CONSUMER_HANDLER; +typedef Thr_Consumer_Proxy CONSUMER_PROXY; #else -typedef Consumer_Proxy CONSUMER_HANDLER; +typedef Consumer_Proxy CONSUMER_PROXY; #endif /* USE_OUTPUT_MT */ #if defined (USE_INPUT_MT) -typedef Thr_Supplier_Proxy SUPPLIER_HANDLER; +typedef Thr_Supplier_Proxy SUPPLIER_PROXY; #else -typedef Supplier_Proxy SUPPLIER_HANDLER; +typedef Supplier_Proxy SUPPLIER_PROXY; #endif /* USE_INPUT_MT */ #else // Instantiate a non-multi-threaded Gateway. -typedef Supplier_Proxy SUPPLIER_HANDLER; -typedef Consumer_Proxy CONSUMER_HANDLER; +typedef Supplier_Proxy SUPPLIER_PROXY; +typedef Consumer_Proxy CONSUMER_PROXY; #endif /* ACE_HAS_THREADS */ #endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index 7e99902b0db..5b95dc4fbf0 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -27,7 +27,7 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, line_number++; } - // Get the logic id. + // Get the logical id. if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS) return read_result; @@ -35,12 +35,12 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, if ((read_result = this->getint (entry.type_)) != FP::SUCCESS) return read_result; - // get all the destinations. - entry.total_destinations_ = 0; + // get all the consumers. + entry.total_consumers_ = 0; - while ((read_result = this->getint (entry.destinations_[entry.total_destinations_])) + while ((read_result = this->getint (entry.consumers_[entry.total_consumers_])) == FP::SUCCESS) - ++entry.total_destinations_; // do nothing + ++entry.total_consumers_; // do nothing (should check against max...) if (read_result == FP::EOLINE || read_result == FP::EOFILE) return FP::SUCCESS; @@ -63,8 +63,8 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, { if (read_result == FP::EOFILE) return FP::EOFILE; - else if (read_result == FP::EOLINE || - read_result == FP::COMMENT) + else if (read_result == FP::EOLINE + || read_result == FP::COMMENT) line_number++; } @@ -72,19 +72,19 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, if ((read_result = this->getword (entry.host_)) != FP::SUCCESS) return read_result; - int port; + ACE_INT32 port; // Get the port number. if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.remote_poconsumer_ = (u_short) port; + entry.remote_port_ = (u_short) port; - // Get the direction. + // Get the proxy role. if ((read_result = this->getword (buf)) != FP::SUCCESS) return read_result; else - entry.direction_ = buf[0]; + entry.proxy_role_ = buf[0]; // Get the max retry delay. if ((read_result = this->getint (entry.max_retry_delay_)) != FP::SUCCESS) @@ -94,7 +94,7 @@ Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.local_poconsumer_ = (u_short) port; + entry.local_port_ = (u_short) port; return FP::SUCCESS; } @@ -108,7 +108,7 @@ int main (int argc, char *argv[]) exit (1); } FP_RETURN_TYPE result; - Connection_Config_File_Entry CCentry; + Connection_Config_File_Entry entry; Connection_Config_File_Parser CCfile; CCfile.open (argv[1]); @@ -118,15 +118,15 @@ int main (int argc, char *argv[]) printf ("ConnID\tHost\t\tRPort\tDir\tRetry\tLPort\n"); // Read config file line at a time. - while ((result = CCfile.read_entry (CCentry, line_number)) != EOF) + while ((result = CCfile.read_entry (entry, line_number)) != EOF) { if (result != FP::SUCCESS) // ACE_DEBUG ((LM_DEBUG, "Error line %d.\n", line_number)); cerr << "Error at line " << line_number << endl; else printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n", - CCentry.conn_id_, CCentry.host_, CCentry.remote_poconsumer_, CCentry.direction_, - CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_poconsumer_); + entry.conn_id_, entry.host_, entry.remote_port_, entry.proxy_role_, + entry.max_retry_delay_, entry.transform_, entry.local_port_); } CCfile.close(); @@ -148,8 +148,8 @@ int main (int argc, char *argv[]) { printf ("%d\t%d\t%d\t%d\t", entry.conn_id_, entry.supplier_id_, entry.type_); - while (--entry.total_destinations_ >= 0) - printf ("%d,", entry.destinations_[entry.total_destinations_]); + while (--entry.total_consumers_ >= 0) + printf ("%d,", entry.consumers_[entry.total_consumers_]); printf ("\n"); } } diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 2620301e25b..eae0248eb8c 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -22,25 +22,25 @@ class Connection_Config_File_Entry // = TITLE - // Stores the Proxy_Handler entry for connection configuration. + // Stores connection configuration information. { public: - int conn_id_; + ACE_INT32 conn_id_; // Connection id for this Proxy_Handler. char host_[BUFSIZ]; // Host to connect with. - u_short remote_poconsumer_; + u_short remote_port_; // Port to connect with. - char direction_; + char proxy_role_; // 'S' (supplier) or 'C' (consumer). - int max_retry_delay_; + ACE_INT32 max_retry_delay_; // Maximum amount of time to wait for reconnecting. - u_short local_poconsumer_; + u_short local_port_; // Our local port number. }; @@ -59,23 +59,23 @@ class Consumer_Config_File_Entry { public: enum { - MAX_DESTINATIONS = 1000 // Total number of multicast destinations. + MAX_CONSUMERS = 1000 // Total number of multicast consumers. }; - int conn_id_; - // Connection id for this channel. + ACE_INT32 conn_id_; + // Connection id for this proxy. - int supplier_id_; - // Logical routing id for this channel. + ACE_INT32 supplier_id_; + // Logical supplier id for this proxy. - int type_; - // Type of payload in the message. + ACE_INT32 type_; + // Message type. - int destinations_[MAX_DESTINATIONS]; - // Connection ids for destinations that we're routing to. + ACE_INT32 consumers_[MAX_CONSUMERS]; + // Connection ids for consumers that we're routing to. - int total_destinations_; - // Total number of these destinations. + int total_consumers_; + // Total number of these consumers. }; class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry> diff --git a/apps/Gateway/Gateway/Consumer_Dispatch_Set.h b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h new file mode 100644 index 00000000000..71e2046b56e --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Dispatch_Set.h @@ -0,0 +1,28 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Dispatch_Set.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_DISPATCH_SET) +#define _DISPATCH_SET + +#include "ace/Set.h" + +// Forward reference. +class Proxy_Handler; + +typedef ACE_Unbounded_Set<Proxy_Handler *> Consumer_Dispatch_Set; +typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Consumer_Dispatch_Set_Iterator; + +#endif /* _DISPATCH_SET */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h index 24881c3e85b..5e288edf910 100644 --- a/apps/Gateway/Gateway/Event.h +++ b/apps/Gateway/Gateway/Event.h @@ -23,7 +23,7 @@ // Proxy_Handler in the Gateway. typedef ACE_INT32 ACE_INT32; -class Event_Addr +class Event_Key // = TITLE // Address used to identify the source/destination of an event. // @@ -33,14 +33,14 @@ class Event_Addr // Channel from the format of the data. { public: - Event_Addr (ACE_INT32 cid = -1, + Event_Key (ACE_INT32 cid = -1, u_char sid = 0, u_char type = 0) : conn_id_ (cid), supplier_id_ (sid), type_ (type) {} - int operator== (const Event_Addr &event_addr) const + int operator== (const Event_Key &event_addr) const { return this->conn_id_ == event_addr.conn_id_ && this->supplier_id_ == event_addr.supplier_id_ @@ -58,10 +58,13 @@ public: // Event type. }; - class Event_Header // = TITLE - // Fixed sized header. + // Fixed sized header. + // + // = DESCRIPTION + // This is designed to have a sizeof (16) to avoid alignment + // problems on most platforms. { public: typedef ACE_INT32 SUPPLIER_ID; @@ -72,14 +75,35 @@ public: INVALID_ID = -1 // No peer can validly use this number. }; + void decode (void) + { + this->len_ = ntohl (this->len_); + this->supplier_id_ = ntohl (this->supplier_id_); + this->type_ = ntohl (this->type_); + this->priority_ = ntohl (this->priority_); + } + // Decode from network byte order to host byte order. + + void encode (void) + { + this->len_ = htonl (this->len_); + this->supplier_id_ = htonl (this->supplier_id_); + this->type_ = htonl (this->type_); + this->priority_ = htonl (this->priority_); + } + // Encode from host byte order to network byte order. + + size_t len_; + // Length of the data_ payload, in bytes. + SUPPLIER_ID supplier_id_; // Source ID. ACE_INT32 type_; // Event type. - size_t len_; - // Length of the entire event (including data payload) in bytes. + ACE_INT32 priority_; + // Event priority. }; class Event diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index d146ddfb362..02f2cd465f8 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -2,38 +2,35 @@ // $Id$ #define ACE_BUILD_SVC_DLL -#include "ace/Get_Opt.h" -#include "Config_Files.h" #include "Proxy_Handler_Connector.h" #include "Event_Channel.h" -#if !defined (ACE_EVENT_CHANNEL_C) -#define ACE_EVENT_CHANNEL_C +ACE_Event_Channel_Options::ACE_Event_Channel_Options (void) + : performance_window_ (0), + blocking_semantics_ (ACE_NONBLOCK), + socket_queue_size_ (0) +{ +} -template <class SH, class CH> -ACE_Event_Channel<SH, CH>::~ACE_Event_Channel (void) +ACE_Event_Channel::~ACE_Event_Channel (void) { } -template <class SH, class CH> -ACE_Event_Channel<SH, CH>::ACE_Event_Channel (void) - : connection_config_file_ ("connection_config"), - consumer_config_file_ ("consumer_config"), - active_connector_role_ (1), - performance_window_ (0), - blocking_semantics_ (ACE_NONBLOCK), - debug_ (0), - connector_ (0), - socket_queue_size_ (0) +ACE_Event_Channel::ACE_Event_Channel (void) +{ +} + +ACE_Event_Channel_Options & +ACE_Event_Channel::options (void) { + return this->options_; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, - const void *) +ACE_Event_Channel::handle_timeout (const ACE_Time_Value &, + const void *) { ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // If we've got a ACE_Thread Manager then use it to suspend all the // threads. This will enable us to get an accurate count. @@ -47,17 +44,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, size_t total_bytes_in = 0; size_t total_bytes_out = 0; - // Iterate through the consumer map connecting all the Proxy_Handlers. + // Iterate through the connection map summing up the number of bytes + // sent/received. for (CONNECTION_MAP_ENTRY *me = 0; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { Proxy_Handler *proxy_handler = me->int_id_; - if (proxy_handler->direction () == 'C') + if (proxy_handler->proxy_role () == 'C') total_bytes_out += proxy_handler->total_bytes (); - else // proxy_handler->direction () == 'S' + else // proxy_handler->proxy_role () == 'S' total_bytes_in += proxy_handler->total_bytes (); } @@ -74,13 +72,13 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); #else ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - this->performance_window_, - total_bytes_in, + this->options ().performance_window_, + total_bytes_in, total_bytes_out)); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)))); + (float) (total_bytes_in * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)))); + (float) (total_bytes_out * 8 / (float) (1024 * 1024 * this->options ().performance_window_)))); #endif /* ACE_NLOGGING */ #if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT) @@ -95,31 +93,177 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, return 0; } +ACE_Event_Channel::put (ACE_Message_Block *forward_addr, + ACE_Time_Value *) +{ + // We got a valid event, so determine its virtual forwarding + // address, which is stored in the first of the two event blocks, + // which are chained together by this->recv(). + + Event_Key *forwarding_addr = (Event_Key *) forward_addr->rd_ptr (); + + // Skip over the address portion and get the data. + ACE_Message_Block *data = forward_addr->cont (); + + // <dispatch_set> points to the set of Consumers associated with + // this forwarding address. + Consumer_Dispatch_Set *dispatch_set = 0; + + if (this->efd_.find (*forwarding_addr, dispatch_set) == -1) + // Failure. + ACE_ERROR ((LM_DEBUG, + "(%t) find failed on conn id = %d, logical id = %d, type = %d\n", + forwarding_addr->conn_id_, + forwarding_addr->supplier_id_, + forwarding_addr->type_)); + else + { + // Check to see if there are any consumers. + if (dispatch_set->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active consumers for this event currently\n")); + + else // There are consumers, so forward the event. + { + Consumer_Dispatch_Set_Iterator dsi (*dispatch_set); + + // At this point, we should assign a thread-safe locking + // strategy to the Message_Block is we're running in a + // multi-threaded configuration. + // data->locking_strategy (MB_Locking_Strategy::instance ()); + + for (Proxy_Handler **proxy_handler = 0; + dsi.next (proxy_handler) != 0; + dsi.advance ()) + { + // Only process active proxy_handlers. + if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) + { + // Duplicate the event portion via reference + // counting. + ACE_Message_Block *dup_msg = data->duplicate (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", + (*proxy_handler)->id ())); + + if ((*proxy_handler)->put (dup_msg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", + "put", (*proxy_handler)->id ())); + + // We are responsible for releasing an + // ACE_Message_Block if failures occur. + dup_msg->release (); + } + } + } + } + } + + // Release the memory in the message block. + forward_addr->release (); + return 0; +} + +ACE_Event_Channel::svc (void) +{ + return 0; +} + +int +ACE_Event_Channel::initiate_proxy_connection (Proxy_Handler *proxy_handler, + ACE_Synch_Options &synch_options) +{ + return this->connector_.initiate_connection (proxy_handler, + synch_options); +} + +int +ACE_Event_Channel::complete_proxy_connection (Proxy_Handler *proxy_handler) +{ + int option = proxy_handler->proxy_role () == 'S' ? SO_RCVBUF : SO_SNDBUF; + int socket_queue_size = this->options ().socket_queue_size_; + + if (proxy_handler->peer ().set_option (SOL_SOCKET, + option, + &socket_queue_size, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + + proxy_handler->thr_mgr (ACE_Service_Config::thr_mgr ()); + + // Our state is now "established." + proxy_handler->state (Proxy_Handler::ESTABLISHED); + + // Restart the timeout to 1. + proxy_handler->timeout (1); + + ACE_INT32 id = htonl (proxy_handler->id ()); + + // Send the connection id to the peerd. + + ssize_t n = proxy_handler->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "peer has closed down unexpectedly" : "send"), + -1); +} + +// Restart connection (blocking_semantics dicates whether we restart +// synchronously or asynchronously). + +int +ACE_Event_Channel::reinitiate_proxy_connection (Proxy_Handler *proxy_handler) +{ + // Skip over deactivated descriptors. + if (proxy_handler->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + proxy_handler->peer ().close (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of Proxy_Handler %d\n", + proxy_handler->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (proxy_handler, 0, proxy_handler->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + // Initiate connections with the Consumer and Supplier Peers. -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::initiate_connections (void) +ACE_Event_Channel::initiate_connections (void) { - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); ACE_Synch_Options synch_options; - if (this->blocking_semantics_ == ACE_NONBLOCK) + if (this->options ().blocking_semantics_ == ACE_NONBLOCK) synch_options = ACE_Synch_Options::asynch; else synch_options = ACE_Synch_Options::synch; - // Iterate through the Consumer Map connecting all the Proxy_Handlers. + // Iterate through the Consumer Map connecting all the + // Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { Proxy_Handler *proxy_handler = me->int_id_; - if (this->connector_->initiate_connection + if (this->initiate_proxy_connection (proxy_handler, synch_options) == -1) - continue; + continue; // Failures are handled elsewhere... } return 0; @@ -128,8 +272,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) // This method gracefully shuts down all the Handlers in the // Proxy_Handler Connection Map. -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::close (void) +ACE_Event_Channel::close (u_long) { #if defined (USE_INPUT_MT) || defined (USE_OUTPUT_MT) ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); @@ -147,7 +290,7 @@ ACE_Event_Channel<SH, CH>::close (void) { Proxy_Handler *proxy_handler = me->int_id_; - ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", + ACE_DEBUG ((LM_DEBUG, "(%t) closing down connection %d\n", proxy_handler->id ())); if (proxy_handler->state () != Proxy_Handler::IDLE) @@ -159,247 +302,76 @@ ACE_Event_Channel<SH, CH>::close (void) proxy_handler->destroy (); // Will trigger a delete. } - // Free up the resources allocated dynamically by the ACE_Connector. - delete this->connector_; return 0; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::open (int argc, char *argv[]) +int +ACE_Event_Channel::find_proxy (ACE_INT32 conn_id, + Proxy_Handler *&proxy_handler) { - this->parse_args (argc, argv); - - ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1); - - // Ignore SIPPIPE so each Consumer_Proxy can handle it. - ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - - if (this->active_connector_role_) - { - // Parse the connection configuration file. - this->parse_connection_config_file (); - - // Parse the consumer map config file and build the consumer map. - this->parse_consumer_config_file (); - - // Initiate connections with the peers. - this->initiate_connections (); - } - - // If this->performance_window_ > 0 start a timer. - - if (this->performance_window_ > 0) - { - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->performance_window_) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); - else - ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", - this->performance_window_)); - } - - return 0; + return this->connection_map_.find (conn_id, proxy_handler); } -// Parse and build the connection table. - -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) +int +ACE_Event_Channel::bind_proxy (Proxy_Handler *proxy_handler) { - // File that contains the consumer map configuration information. - Connection_Config_File_Parser connection_file; - Connection_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (connection_file.open (this->connection_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1); - - // Read config file one line at a time. - while (connection_file.read_entry (entry, line_number) != FP::EOFILE) + switch (this->connection_map_.bind (proxy_handler->id (), proxy_handler)) { - file_empty = 0; - - if (this->debug_) - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " - "direction = %c, max retry timeout = %d, local port = %d\n", - entry.conn_id_, - entry.host_, - entry.remote_poconsumer_, - entry.direction_, - entry.max_retry_delay_, - entry.local_poconsumer_)); - - Proxy_Handler *proxy_handler = 0; - - // The next few lines of code are dependent on whether we are - // making an Supplier_Proxy or an Consumer_Proxy. - - if (entry.direction_ == 'C') // Configure a Consumer_Proxy. - ACE_NEW_RETURN (proxy_handler, - CONSUMER_HANDLER (&this->efd_, - this->connector_, - ACE_Service_Config::thr_mgr (), - this->socket_queue_size_), - -1); - else /* direction == 'S' */ // Configure a Supplier_Proxy. - ACE_NEW_RETURN (proxy_handler, - SUPPLIER_HANDLER (&this->efd_, - this->connector_, - ACE_Service_Config::thr_mgr (), - this->socket_queue_size_), - -1); - - // The following code is common to both Supplier_Proxys_ and - // Consumer_Proxys. - - // Initialize the routing entry's peer addressing info. - proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), - ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_); - - // Initialize max timeout. - proxy_handler->max_timeout (entry.max_retry_delay_); - - // Try to bind the new Proxy_Handler to the connection ID. - switch (this->connection_map_.bind (entry.conn_id_, proxy_handler)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, - "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, - "(%t) duplicate connection %d, already bound\n", - entry.conn_id_)); - break; - case 0: - // Success. - break; - } + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) bind failed for connection %d\n", + proxy_handler->id ()), -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) duplicate connection %d, already bound\n", + proxy_handler->id ()), -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: connection proxy_handler configuration file was empty\n")); - return 0; } -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void) +int +ACE_Event_Channel::subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds) { - // File that contains the consumer map configuration information. - Consumer_Config_File_Parser consumer_file; - Consumer_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (consumer_file.open (this->consumer_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1); - - // Read config file line at a time. - while (consumer_file.read_entry (entry, line_number) != FP::EOFILE) + // Bind with consumer map, keyed by peer address. + switch (this->efd_.bind (event_addr, cds)) { - file_empty = 0; - - if (this->debug_) - { - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " - "number of destinations = %d\n", - entry.conn_id_, - entry.supplier_id_, - entry.type_, - entry.total_destinations_)); - for (int i = 0; i < entry.total_destinations_; i++) - ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - Dispatch_Set *dispatch_set; - ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1); - - Event_Addr event_addr (entry.conn_id_, - entry.supplier_id_, - entry.type_); - - // Add the destinations to the Routing Entry. - for (int i = 0; i < entry.total_destinations_; i++) - { - Proxy_Handler *proxy_handler = 0; - - // Lookup destination and add to Dispatch_Set set if found. - if (this->connection_map_.find (entry.destinations_[i], - proxy_handler) != -1) - dispatch_set->insert (proxy_handler); - else - ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - // Bind with consumer map, keyed by peer address. - switch (this->efd_.bind (event_addr, dispatch_set)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " - "already bound\n", entry.conn_id_)); - break; - case 0: - // Success. - break; - } + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", + event_addr.conn_id_), -1); + /* NOTREACHED */ + case 1: // Oops, found a duplicate! + ACE_ERROR_RETURN ((LM_DEBUG, "(%t) duplicate consumer map entry %d, " + "already bound\n", event_addr.conn_id_), -1); + /* NOTREACHED */ + case 0: + // Success. + return 0; } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: consumer map configuration file was empty\n")); - return 0; } -// Parse the "command-line" arguments and set the corresponding flags. - -template <class SH, class CH> int -ACE_Event_Channel<SH, CH>::parse_args (int argc, char *argv[]) +ACE_Event_Channel::open (void *) { - ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0); + // Ignore SIPPIPE so each Consumer_Proxy can handle it. + ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - for (int c; (c = get_opt ()) != -1; ) +#if 0 + // If this->performance_window_ > 0 start a timer. + + if (this->options ().performance_window_ > 0) { - switch (c) - { - case 'b': // Use blocking connection establishment. - this->blocking_semantics_ = 0; - break; - case 'c': - this->connection_config_file_ = get_opt.optarg; - break; - case 'd': - this->debug_ = 1; - break; - case 'p': - // We are not playing the active Connector role. - this->active_connector_role_ = 0; - break; - case 'q': - this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg); - break; - case 'r': - this->consumer_config_file_ = get_opt.optarg; - break; - case 'w': // Time performance for a designated amount of time. - this->performance_window_ = ACE_OS::atoi (get_opt.optarg); - // Use blocking connection semantics so that we get accurate - // timings (since all connections start at once). - this->blocking_semantics_ = 0; - break; - default: - break; - } + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->options ().performance_window_) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); + else + ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", + this->options ().performance_window_))); } +#endif + return 0; } - -#endif /* ACE_EVENT_CHANNEL_C */ diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 4e7afc5d328..1ecf468addf 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -19,64 +19,95 @@ #include "Proxy_Handler_Connector.h" -template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER> -class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler +class ACE_Svc_Export ACE_Event_Channel_Options + // = TITLE + // Maintains the options for an <ACE_Event_Channel>. +{ +public: + ACE_Event_Channel_Options (void); + // Initialization. + + int performance_window_; + // Number of seconds after connection establishment to report + // throughput. + + int blocking_semantics_; + // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). +}; + +class ACE_Svc_Export ACE_Event_Channel : public ACE_Task<SYNCH_STRATEGY> // = TITLE // Define a generic Event_Channel. + // + // = DESCRIPTION { public: // = Initialization and termination methods. ACE_Event_Channel (void); ~ACE_Event_Channel (void); - int open (int argc, char *argv[]); - // Initialize the Channel. + virtual int open (void * = 0); + // Open the channel. - int close (void); + virtual int close (u_long = 0); // Close down the Channel. -private: - int parse_args (int argc, char *argv[]); - // Parse the command-line arguments. + // = Proxy management methods. + int initiate_proxy_connection (Proxy_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + // Initiate the connection of the <Proxy_Handler> to its peer. - int parse_connection_config_file (void); - // Parse the connection configuration file. + int complete_proxy_connection (Proxy_Handler *); + // Complete the initialization of the <Proxy_Handler> once it's + // connected to its Peer. - int parse_consumer_config_file (void); - // Parse the consumer map configuration file. + int reinitiate_proxy_connection (Proxy_Handler *); + // Reinitiate a connection asynchronously when the Peer fails. - int initiate_connections (void); - // Initiate connections to the peers. + int bind_proxy (Proxy_Handler *); + // Bind the <Proxy_Handler> to the <connection_map_>. - virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based performance profiling. + int find_proxy (ACE_INT32 conn_id, Proxy_Handler *&); + // Locate the <Proxy_Handler> with <conn_id>. - const char *connection_config_file_; - // Name of the connection configuration file. + int subscribe (const Event_Key &event_addr, + Consumer_Dispatch_Set *cds); + // Subscribe the <Consumer_Dispatch_Set> to receive events that + // match <Event_Key>. - const char *consumer_config_file_; - // Name of the consumer map configuration file. + // = Event forwarding method. + virtual int put (ACE_Message_Block *mb, ACE_Time_Value * = 0); + // Pass <mb> to the Event Channel so it can forward it to Consumers. - int active_connector_role_; - // Enabled if we are playing the role of the active Connector. + ACE_Event_Channel_Options &options (void); + // Points to the Event_Channel options. - int performance_window_; - // Number of seconds after connection establishment to report - // throughput. - - int blocking_semantics_; - // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + int initiate_connections (void); + // Initiate connections to the peers. + +private: + virtual int svc (void); + // Run as an active object. - int debug_; - // Are we debugging? + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. - Proxy_Handler_Connector *connector_; - // This is used to establish the connections actively. + virtual int handle_timeout (const ACE_Time_Value &, + const void *arg); + // Perform timer-based performance profiling. - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). + Proxy_Handler_Connector connector_; + // Used to establish the connections actively. + + // Proxy_Handler_Acceptor acceptor_; + // Used to establish the connections passively. // = Make life easier by defining typedefs. + // Note that Proxy_Handler is assumed to the base class of + // SUPPLIER_PROXY and CONSUMER_PROXY. typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP; typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY; @@ -85,16 +116,10 @@ private: // Table that maps Connection IDs to Proxy_Handler *'s. Event_Forwarding_Discriminator efd_; - // Map that associates event addresses to a set of Consumer_Proxy - // *'s. -}; + // Map that associates an event to a set of Consumer_Proxy *'s. -#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) -#include "Event_Channel.cpp" -#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ - -#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) -#pragma implementation ("Event_Channel.cpp") -#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + ACE_Event_Channel_Options options_; + // The options for the channel. +}; #endif /* ACE_EVENT_CHANNEL */ diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp index 8261ea13eb2..4dfbb658c1f 100644 --- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp @@ -6,51 +6,49 @@ #include "Event_Forwarding_Discriminator.h" -// Bind the Event_Addr to the INT_ID. +// Bind the Event_Key to the INT_ID. int -Event_Forwarding_Discriminator::bind (Event_Addr event_addr, - Dispatch_Set *Dispatch_Set) +Event_Forwarding_Discriminator::bind (Event_Key event_addr, + Consumer_Dispatch_Set *cds) { - return this->map_.bind (event_addr, Dispatch_Set); + return this->map_.bind (event_addr, cds); } -// Find the Dispatch_Set corresponding to the Event_Addr. +// Find the Consumer_Dispatch_Set corresponding to the Event_Key. int -Event_Forwarding_Discriminator::find (Event_Addr event_addr, - Dispatch_Set *&Dispatch_Set) +Event_Forwarding_Discriminator::find (Event_Key event_addr, + Consumer_Dispatch_Set *&cds) { - return this->map_.find (event_addr, Dispatch_Set); + return this->map_.find (event_addr, cds); } -// Unbind (remove) the Event_Addr from the map. +// Unbind (remove) the Event_Key from the map. int -Event_Forwarding_Discriminator::unbind (Event_Addr event_addr) +Event_Forwarding_Discriminator::unbind (Event_Key event_addr) { return this->map_.unbind (event_addr); } -Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &rt) - : map_iter_ (rt.map_) +Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator + (Event_Forwarding_Discriminator &rt) + : map_iter_ (rt.map_) { } int -Event_Forwarding_Discriminator_Iterator::next (Dispatch_Set *&ss) +Event_Forwarding_Discriminator_Iterator::next (Consumer_Dispatch_Set *&cds) { - // Loop in order to skip over inactive entries if necessary. - - for (ACE_Map_Entry<Event_Addr, Dispatch_Set *> *temp = 0; - this->map_iter_.next (temp) != 0; - this->advance ()) + ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *> *temp; + if (this->map_iter_.next (temp) == 0) + return 0; + else { - // Otherwise, return the next item. - ss = temp->int_id_; + cds = temp->int_id_; return 1; } - return 0; } int diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h index 35a594b61b5..9b7531c1f46 100644 --- a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h @@ -20,29 +20,27 @@ #include "ace/Map_Manager.h" #include "Concurrency_Strategies.h" #include "Event.h" -#include "Dispatch_Set.h" +#include "Consumer_Dispatch_Set.h" class Event_Forwarding_Discriminator { // = TITLE - // Define a generic consumer map based on the ACE Map_Manager. - // - // = DESCRIPTION - // This class makes it easier to use the Map_Manager. + // Map events to the set of Consumer_Proxies that have subscribed + // to receive the event. public: - int bind (Event_Addr event, Dispatch_Set *Dispatch_Set); - // Associate Event with the Dispatch_Set. + int bind (Event_Key event, Consumer_Dispatch_Set *cds); + // Associate Event with the Consumer_Dispatch_Set. - int find (Event_Addr event, Dispatch_Set *&Dispatch_Set); - // Break any association of EXID. - - int unbind (Event_Addr event); + int unbind (Event_Key event); // Locate EXID and pass out parameter via INID. If found, // return 0, else -1. + int find (Event_Key event, Consumer_Dispatch_Set *&cds); + // Break any association of EXID. + public: - ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_; - // Map that associates Event Addrs (external ids) with Dispatch_Set *'s + ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_; + // Map that associates Event Addrs (external ids) with Consumer_Dispatch_Set *'s // <internal IDs>. }; @@ -52,11 +50,11 @@ class Event_Forwarding_Discriminator_Iterator // Define an iterator for the Consumer Map. public: Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm); - int next (Dispatch_Set *&); + int next (Consumer_Dispatch_Set *&); int advance (void); private: - ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_iter_; + ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX> map_iter_; // Map we are iterating over. }; #endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/File_Parser.cpp b/apps/Gateway/Gateway/File_Parser.cpp index be33e9a96d2..07bda87180b 100644 --- a/apps/Gateway/Gateway/File_Parser.cpp +++ b/apps/Gateway/Gateway/File_Parser.cpp @@ -15,7 +15,11 @@ typedef FP::Return_Type FP_RETURN_TYPE; template <class ENTRY> int File_Parser<ENTRY>::open (const char filename[]) { - return (this->infile_ = ACE_OS::fopen (filename, "r")) == 0 ? -1 : 0; + this->infile_ = ACE_OS::fopen (filename, "r"); + if (this->infile_ == 0) + return -1; + else + return 0; } template <class ENTRY> int @@ -27,17 +31,13 @@ File_Parser<ENTRY>::close (void) template <class ENTRY> FP_RETURN_TYPE File_Parser<ENTRY>::getword (char buf[]) { - FP_RETURN_TYPE read_result = this->readword(buf); - if (read_result == FP::SUCCESS) - return FP::SUCCESS; - else - return read_result; + return this->readword (buf); } // Get the next string from the file via this->readword() // Check make sure the string forms a valid number. template <class ENTRY> FP_RETURN_TYPE -File_Parser<ENTRY>::getint (int &value) +File_Parser<ENTRY>::getint (ACE_INT32 &value) { char buf[BUFSIZ]; FP_RETURN_TYPE read_result = this->readword(buf); @@ -50,7 +50,7 @@ File_Parser<ENTRY>::getint (int &value) value = ACE_OS::strtol (buf, &ptr, 10); // check if the buf is a decimal or not - if ((value == 0) && (ptr == buf)) + if (value == 0 && ptr == buf) return FP::ERROR; else return FP::SUCCESS; @@ -81,8 +81,8 @@ File_Parser<ENTRY>::readword (char buf[]) buf[wordlength] = '\0'; if (c == EOF) { - // If the EOF is just a dilimeter, don't return EOF so that the - // word gets processed + // If EOF is just a delimiter, don't return EOF so that the word + // gets processed. if (wordlength > 0) { ungetc (c, this->infile_); @@ -94,7 +94,7 @@ File_Parser<ENTRY>::readword (char buf[]) } else if (c == '\n') { - // if the EOLINE is just a dilimeter, don't return EOLINE + // if the EOLINE is just a delimiter, don't return EOLINE // so that the word gets processed if (wordlength > 0) ungetc (c, this->infile_); diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index 80b768aff84..f1de7429db0 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -52,7 +52,7 @@ protected: FP::Return_Type getword (char buf[]); // Read the next ASCII word. - FP::Return_Type getint (int &value); + FP::Return_Type getint (ACE_INT32 &value); // Read the next integer. FP::Return_Type readword (char buf[]); diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index 2c963ff3d7f..4ff09aed1b7 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -1,6 +1,8 @@ /* -*- C++ -*- */ // $Id$ +#include "ace/Get_Opt.h" +#include "Config_Files.h" #include "ace/Service_Config.h" #include "Event_Channel.h" #include "Gateway.h" @@ -24,6 +26,12 @@ public: virtual int info (char **, size_t) const; // Return info about this service. + int parse_connection_config_file (void); + // Parse the connection configuration file. + + int parse_consumer_config_file (void); + // Parse the consumer configuration file. + protected: int handle_input (ACE_HANDLE); // Shut down the Gateway when input comes in from the controlling @@ -36,13 +44,21 @@ protected: // Parse gateway configuration arguments obtained from svc.conf // file. - ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_; + char connection_config_file_[MAXPATHLEN + 1]; + // Name of the connection configuration file. + + char consumer_config_file_[MAXPATHLEN + 1]; + // Name of the consumer map configuration file. + + ACE_Event_Channel event_channel_; // The Event Channel routes events from Supplier(s) to Consumer(s). -}; -// Convenient shorthands. -// #define IC SUPPLIER_HANDLER -// #define OC CONSUMER_HANDLER + int active_connector_role_; + // Enabled if we are playing the role of the active Connector. + + int debug_; + // Are we debugging? +}; int Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) @@ -70,18 +86,76 @@ Gateway::handle_input (ACE_HANDLE h) return this->handle_signal (h); } +// Parse the "command-line" arguments and set the corresponding flags. + +int +Gateway::parse_args (int argc, char *argv[]) +{ + ACE_OS::strcpy (this->connection_config_file_, "connection_config"); + ACE_OS::strcpy (this->consumer_config_file_, "consumer_config"); + this->active_connector_role_ = 1; + this->debug_ = 0; + + ACE_Get_Opt get_opt (argc, argv, "bc:dpr:q:w:", 0); + + for (int c; (c = get_opt ()) != -1; ) + { + switch (c) + { + case 'b': // Use blocking connection establishment. + this->event_channel_.options ().blocking_semantics_ = 0; + break; + case 'c': + ACE_OS::strncpy (this->connection_config_file_, + get_opt.optarg, + sizeof this->connection_config_file_); + break; + case 'd': + this->debug_ = 1; + break; + case 'p': + // We are not playing the active Connector role. + this->active_connector_role_ = 0; + break; + case 'q': + this->event_channel_.options ().socket_queue_size_ = + ACE_OS::atoi (get_opt.optarg); + break; + case 'r': + ACE_OS::strncpy (this->consumer_config_file_, + get_opt.optarg, + sizeof this->consumer_config_file_); + break; + case 'w': // Time performance for a designated amount of time. + this->event_channel_.options ().performance_window_ = + ACE_OS::atoi (get_opt.optarg); + // Use blocking connection semantics so that we get accurate + // timings (since all connections start at once). + this->event_channel_.options ().blocking_semantics_ = 0; + break; + default: + break; + } + } + return 0; +} + int Gateway::init (int argc, char *argv[]) { - if (this->event_channel_.open (argc, argv) == -1) + // Initialize the Event_Channel. + if (this->event_channel_.open () == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1); + // Parse the "command-line" arguments. + this->parse_args (argc, argv); + ACE_Sig_Set sig_set; sig_set.sig_add (SIGINT); sig_set.sig_add (SIGQUIT); - // Register ourselves to receive SIGINT and SIGQUIT - // so we can shut down gracefully via signals. + // Register ourselves to receive SIGINT and SIGQUIT so we can shut + // down gracefully via signals. if (ACE_Service_Config::reactor ()->register_handler (sig_set, this) == -1) @@ -90,6 +164,20 @@ Gateway::init (int argc, char *argv[]) if (ACE_Service_Config::reactor ()->register_handler (0, this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + + if (this->active_connector_role_) + { + // Parse the connection configuration file. + this->parse_connection_config_file (); + + // Parse the consumer map config file and build the consumer + // map. + this->parse_consumer_config_file (); + + // Initiate connections with the peers. + this->event_channel_.initiate_connections (); + } + return 0; } @@ -120,6 +208,133 @@ Gateway::info (char **strp, size_t length) const return ACE_OS::strlen (buf); } +// Parse and build the connection table. + +int +Gateway::parse_connection_config_file (void) +{ + // File that contains the consumer map configuration information. + Connection_Config_File_Parser connection_file; + Connection_Config_File_Entry entry; + int file_empty = 1; + int line_number = 0; + + if (connection_file.open (this->connection_config_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->connection_config_file_), -1); + + // Read config file one line at a time. + while (connection_file.read_entry (entry, line_number) != FP::EOFILE) + { + file_empty = 0; + + if (this->debug_) + ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " + "proxy role = %c, max retry timeout = %d, local port = %d\n", + entry.conn_id_, + entry.host_, + entry.remote_port_, + entry.proxy_role_, + entry.max_retry_delay_, + entry.local_port_)); + + Proxy_Handler *proxy_handler = 0; + + // Initialize the entry's peer addressing info. + + ACE_INET_Addr remote_addr (entry.remote_port_, entry.host_); + ACE_INET_Addr local_addr (entry.local_port_); + + // The next few lines of code are dependent on whether we are + // making an Supplier_Proxy or an Consumer_Proxy. + + if (entry.proxy_role_ == 'C') // Configure a Consumer_Proxy. + ACE_NEW_RETURN (proxy_handler, + CONSUMER_PROXY (this->event_channel_, remote_addr, + local_addr, entry.conn_id_), + -1); + else // proxy_role == 'S', so configure a Supplier_Proxy. + ACE_NEW_RETURN (proxy_handler, + SUPPLIER_PROXY (this->event_channel_, remote_addr, + local_addr, entry.conn_id_), + -1); + + // The following code is common to both Supplier_Proxys_ and + // Consumer_Proxys. + + // Initialize max timeout. + proxy_handler->max_timeout (entry.max_retry_delay_); + + // Bind the new Proxy_Handler to the connection ID. + this->event_channel_.bind_proxy (proxy_handler); + } + + if (file_empty) + ACE_ERROR ((LM_WARNING, + "warning: connection proxy_handler configuration file was empty\n")); + return 0; +} + +int +Gateway::parse_consumer_config_file (void) +{ + // File that contains the consumer event forwarding information. + Consumer_Config_File_Parser consumer_file; + Consumer_Config_File_Entry entry; + int file_empty = 1; + int line_number = 0; + + if (consumer_file.open (this->consumer_config_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->consumer_config_file_), -1); + + // Read config file line at a time. + while (consumer_file.read_entry (entry, line_number) != FP::EOFILE) + { + file_empty = 0; + + if (this->debug_) + { + ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " + "number of consumers = %d\n", + entry.conn_id_, + entry.supplier_id_, + entry.type_, + entry.total_consumers_)); + for (int i = 0; i < entry.total_consumers_; i++) + ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", + i, entry.consumers_[i])); + } + + Consumer_Dispatch_Set *dispatch_set; + ACE_NEW_RETURN (dispatch_set, Consumer_Dispatch_Set, -1); + + Event_Key event_addr (entry.conn_id_, + entry.supplier_id_, + entry.type_); + + // Add the Consumers to the Dispatch_Set. + for (int i = 0; i < entry.total_consumers_; i++) + { + Proxy_Handler *proxy_handler = 0; + + // Lookup destination and add to Consumer_Dispatch_Set set + // if found. + if (this->event_channel_.find_proxy (entry.consumers_[i], + proxy_handler) != -1) + dispatch_set->insert (proxy_handler); + else + ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", + i, entry.consumers_[i])); + } + + this->event_channel_.subscribe (event_addr, dispatch_set); + } + + if (file_empty) + ACE_ERROR ((LM_WARNING, + "warning: consumer map configuration file was empty\n")); + return 0; +} + // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the Gateway. diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index 0f5ddc07eb0..c3ae8dffe4d 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -153,7 +153,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ - Dispatch_Set.h Gateway.h + Consumer_Dispatch_Set.h Gateway.h .obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ @@ -222,7 +222,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ - Dispatch_Set.h Event_Channel.h + Consumer_Dispatch_Set.h Event_Channel.h .obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \ Event_Forwarding_Discriminator.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ @@ -245,9 +245,9 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ - Event.h Dispatch_Set.h \ + Event.h Consumer_Dispatch_Set.h \ $(WRAPPER_ROOT)/ace/Set.h -.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Dispatch_Set.h \ +.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Consumer_Dispatch_Set.h \ $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -381,7 +381,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ - Dispatch_Set.h + Consumer_Dispatch_Set.h .obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \ Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ @@ -446,7 +446,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Task_T.h \ Event_Forwarding_Discriminator.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Concurrency_Strategies.h Event.h Dispatch_Set.h \ + Concurrency_Strategies.h Event.h Consumer_Dispatch_Set.h \ Proxy_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Connector.i diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp index 86e0fff8e41..2f161c171f6 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.cpp +++ b/apps/Gateway/Gateway/Proxy_Handler.cpp @@ -1,11 +1,18 @@ // $Id$ -#include "Dispatch_Set.h" -#include "Proxy_Handler_Connector.h" +#include "Event_Channel.h" -// Convenient short-hands. -#define CO CONDITION -#define MU MAP_MUTEX +void +Proxy_Handler::id (ACE_INT32 id) +{ + this->id_ = id; +} + +ACE_INT32 +Proxy_Handler::id (void) +{ + return this->id_; +} // The total number of bytes sent/received on this Proxy. @@ -21,36 +28,35 @@ Proxy_Handler::total_bytes (size_t bytes) this->total_bytes_ += bytes; } -Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), - efd_ (efd), - id_ (-1), +Proxy_Handler::Proxy_Handler (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : remote_addr_ (remote_addr), + local_addr_ (local_addr), + id_ (conn_id), total_bytes_ (0), state_ (Proxy_Handler::IDLE), - connector_ (ioc), timeout_ (1), - max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), - socket_queue_size_ (socket_queue_size) + max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), + event_channel_ (ec) { } -// Set the direction. +// Set the proxy_role. void -Proxy_Handler::direction (char d) +Proxy_Handler::proxy_role (char d) { - this->direction_ = d; + this->proxy_role_ = d; } -// Get the direction. +// Get the proxy_role. char -Proxy_Handler::direction (void) +Proxy_Handler::proxy_role (void) { - return this->direction_; + return this->proxy_role_; } // Sets the timeout delay. @@ -64,9 +70,9 @@ Proxy_Handler::timeout (int to) this->timeout_ = to; } -// Recalculate the current retry timeout delay using exponential +// Re-calculate the current retry timeout delay using exponential // backoff. Returns the original timeout (i.e., before the -// recalculation). +// re-calculation). int Proxy_Handler::timeout (void) @@ -99,37 +105,16 @@ Proxy_Handler::max_timeout (void) // Restart connection asynchronously when timeout occurs. int -Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *) +Proxy_Handler::handle_timeout (const ACE_Time_Value &, + const void *) { ACE_DEBUG ((LM_DEBUG, "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n", this->id (), this->timeout_)); - return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); -} - -// Restart connection (blocking_semantics dicates whether we -// restart synchronously or asynchronously). -int -Proxy_Handler::reinitiate_connection (void) -{ - // Skip over deactivated descriptors. - if (this->get_handle () != ACE_INVALID_HANDLE) - { - // Make sure to close down peer to reclaim descriptor. - this->peer ().close (); - - ACE_DEBUG ((LM_DEBUG, - "(%t) scheduling reinitiation of Proxy_Handler %d\n", - this->id ())); - - // Reschedule ourselves to try and connect again. - if (ACE_Service_Config::reactor ()->schedule_timer - (this, 0, this->timeout ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "schedule_timer"), -1); - } - return 0; + // Delegate the re-connection attempt to the Event Channel. + return this->event_channel_.initiate_proxy_connection + (this, ACE_Synch_Options::asynch); } // Handle shutdown of the Proxy_Handler object. @@ -141,7 +126,8 @@ Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) "(%t) shutting down Proxy_Handler %d on handle %d\n", this->id (), this->get_handle ())); - return this->reinitiate_connection (); + // Restart the connection, if possible. + return this->event_channel_.reinitiate_proxy_connection (this); } // Set the state of the Proxy. @@ -152,66 +138,29 @@ Proxy_Handler::state (Proxy_Handler::State s) this->state_ = s; } -// Perform the first-time initiation of a connection to the peer. - -int -Proxy_Handler::initialize_connection (void) -{ - this->state_ = Proxy_Handler::ESTABLISHED; - - // Restart the timeout to 1. - this->timeout (1); - - // Action that sends the connection id to the peerd. - - ACE_INT32 id = htonl (this->id ()); - - ssize_t n = this->peer ().send ((const void *) &id, sizeof id); - - if (n != sizeof id) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - n == 0 ? "peer has closed down unexpectedly" : "send"), - -1); - return 0; -} - -// Set the size of the socket queue. - -void -Proxy_Handler::socket_queue_size (void) -{ - if (this->socket_queue_size_ > 0) - { - int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; - - if (this->peer ().set_option (SOL_SOCKET, option, - &this->socket_queue_size_, - sizeof (int)) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); - } -} - -// Upcall from the ACE_Acceptor::handle_input() that -// delegates control to our application-specific Proxy_Handler. +// Upcall from the <ACE_Acceptor> or <ACE_Connector> that delegates +// control to our Proxy_Handler. int -Proxy_Handler::open (void *a) +Proxy_Handler::open (void *) { - ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's handle = %d\n", this->peer ().get_handle ())); - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn on non-blocking I/O. if (this->peer ().enable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - // Call down to the base class to activate and register this handler. - if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); - return this->initialize_connection (); + // Register ourselves to receive input events. + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + else + return 0; } // Return the current state of the Proxy. @@ -222,30 +171,6 @@ Proxy_Handler::state (void) return this->state_; } -void -Proxy_Handler::id (ACE_INT32 id) -{ - this->id_ = id; -} - -ACE_INT32 -Proxy_Handler::id (void) -{ - return this->id_; -} - -// Set the peer's address information. -int -Proxy_Handler::bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32 id) -{ - this->remote_addr_ = remote_addr; - this->local_addr_ = local_addr; - this->id_ = id; - return 0; -} - ACE_INET_Addr & Proxy_Handler::remote_addr (void) { @@ -258,15 +183,13 @@ Proxy_Handler::local_addr (void) return this->local_addr_; } -// Constructor sets the consumer map pointer. - -Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +Consumer_Proxy::Consumer_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Proxy_Handler (ec, remote_addr, local_addr, conn_id) { - this->direction_ = 'C'; + this->proxy_role_ = 'C'; this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE); } @@ -311,7 +234,7 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) // Try to send the event. If we don't send it all (e.g., due to // flow control), then re-queue the remainder at the head of the // Event_List and ask the ACE_Reactor to inform us (via - // handle_output()) when it is possible to try again. + // handle_output()) when it is possible to try again. ssize_t n = this->send (event); @@ -325,7 +248,8 @@ Consumer_Proxy::nonblk_put (ACE_Message_Block *event) } else if (errno == EWOULDBLOCK) // Didn't manage to send everything. { - ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing activated on handle %d to routing id %d\n", this->get_handle (), this->id ())); // ACE_Queue in *front* of the list to preserve order. @@ -354,11 +278,11 @@ Consumer_Proxy::send (ACE_Message_Block *event) else if (n < len) // Re-adjust pointer to skip over the part we did send. event->rd_ptr (n); - else /* if (n == length) */ + else // if (n == length) { - // The whole event is sent, we can now safely deallocate the - // buffer. Note that this should decrement a reference count... - delete event; + // The whole event is sent, we now decrement the reference count + // (which deletes itself with it reaches 0. + event->release (); errno = 0; } this->total_bytes (n); @@ -389,9 +313,9 @@ Consumer_Proxy::handle_output (ACE_HANDLE) break; case -1: - // We are responsible for freeing an ACE_Message_Block if + // We are responsible for releasing an ACE_Message_Block if // failures occur. - delete event; + event->release (); ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); /* FALLTHROUGH */ @@ -436,17 +360,14 @@ Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) (event, (ACE_Time_Value *) &ACE_Time_Value::zero); } -// Constructor sets the consumer map pointer and the connector -// pointer. - -Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) +Supplier_Proxy::Supplier_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) : msg_frag_ (0), - Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) + Proxy_Handler (ec, remote_addr, local_addr, conn_id) { - this->direction_ = 'S'; + this->proxy_role_ = 'S'; this->msg_queue ()->high_water_mark (0); } @@ -490,8 +411,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) ACE_DEBUG ((LM_DEBUG, "attempted to read %d\n", header_bytes_left_to_read)); - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); return header_received; } @@ -508,11 +428,34 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) errno = EWOULDBLOCK; return -1; } + + // Convert the header into host byte order so that we can access + // it directly without having to repeatedly muck with it... + event->header_.decode (); + + if (event->header_.len_ > sizeof event->data_) + { + // This data_ payload is too big! + errno = EINVAL; + ACE_DEBUG ((LM_DEBUG, + "Data payload is too big (%d bytes)\n", + event->header_.len_)); + return -1; + } + } - // At this point there is a complete, valid header in msg_frag_ + // At this point there is a complete, valid header in Event. Now we + // need to get the event payload. Due to incomplete reads this may + // not be the first time we've read in a fragment for this message. + // We account for this here. Note that the first time in here + // msg_frag_->wr_ptr() will point to event->data_. Every time we do + // a successful fragment read, we advance wr_ptr(). Therefore, by + // subtracting how much we've already read from the + // event->header_.len_ we complete the data_bytes_left_to_read... + ssize_t data_bytes_left_to_read = - sizeof (Event) - this->msg_frag_->length (); + ssize_t (event->header_.len_ - (msg_frag_->wr_ptr () - event->data_)); ssize_t data_received = this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); @@ -529,8 +472,7 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) /* FALLTHROUGH */; case 0: // Premature EOF. - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); return 0; default: @@ -550,23 +492,22 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) // Allocate an event forwarding header and chain the data // portion onto its continuation field. - forward_addr = new ACE_Message_Block (sizeof (Event_Addr), + forward_addr = new ACE_Message_Block (sizeof (Event_Key), ACE_Message_Block::MB_PROTO, this->msg_frag_); if (forward_addr == 0) { - delete this->msg_frag_; - this->msg_frag_ = 0; + this->msg_frag_ = this->msg_frag_->release (); errno = ENOMEM; return -1; } - Event_Addr event_addr (this->id (), + Event_Key event_addr (this->id (), event->header_.supplier_id_, event->header_.type_); - // Copy the forwarding address from the Event_Addr into + // Copy the forwarding address from the Event_Key into // forward_addr. - forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr)); + forward_addr->copy ((char *) &event_addr, sizeof (Event_Key)); // Reset the pointer to indicate we've got an entire event. this->msg_frag_ = 0; @@ -579,8 +520,12 @@ Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) event->header_.len_, event->data_)); #else ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", - event->header_.supplier_id_, event->header_.len_, this->total_bytes ())); -#endif + event->header_.supplier_id_, event->header_.len_, data_received + header_received)); +#endif /* VERBOSE */ + + // Encode before returning so that we can set things out in + // network byte order. + event->header_.encode (); return data_received + header_received; } } @@ -620,79 +565,17 @@ Supplier_Proxy::handle_input (ACE_HANDLE) } } -// Forward an event to its appropriate Consumer(s). +// Forward an event to its appropriate Consumer(s). This delegates to +// the <ACE_Event_Channel> to do the actual forwarding. int Supplier_Proxy::forward (ACE_Message_Block *forward_addr) { - // We got a valid event, so determine its virtual forwarding - // address, which is stored in the first of the two event blocks, - // which are chained together by this->recv(). - - Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr (); - - // Skip over the address portion and get the data. - const ACE_Message_Block *const data = forward_addr->cont (); - - // <dispatch_set> points to the set of Consumers associated with - // this forwarding address. - Dispatch_Set *dispatch_set = 0; - - if (this->efd_->find (*forwarding_addr, dispatch_set) != -1) - { - // Check to see if there are any destinations. - if (dispatch_set->size () == 0) - ACE_DEBUG ((LM_WARNING, - "there are no active destinations for this event currently\n")); - - else // There are destinations, so forward the event. - { - Dispatch_Set_Iterator dsi (*dispatch_set); - - for (Proxy_Handler **proxy_handler = 0; - dsi.next (proxy_handler) != 0; - dsi.advance ()) - { - // Only process active proxy_handlers. - if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) - { - // Clone the event portion (should be doing - // reference counting here...) - ACE_Message_Block *newmsg = data->clone (); - - ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", - (*proxy_handler)->id ())); - - if ((*proxy_handler)->put (newmsg) == -1) - { - if (errno == EWOULDBLOCK) // The queue has filled up! - ACE_ERROR ((LM_ERROR, "(%t) %p\n", - "gateway is flow controlled, so we're dropping events")); - else - ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", - "put", (*proxy_handler)->id ())); - - // We are responsible for freeing a - // ACE_Message_Block if failures occur. - delete newmsg; - } - } - } - // Will become superfluous once we have reference - // counting... - delete forward_addr; - return 0; - } - } - delete forward_addr; - // Failure return. - ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", - forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_)); - return 0; + return this->event_channel_.put (forward_addr); } #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>; -template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>; -template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>; +template class ACE_Map_Manager<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Key, Consumer_Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Key, Consumer_Dispatch_Set *>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h index d91fa3108ff..ffce18d1c71 100644 --- a/apps/Gateway/Gateway/Proxy_Handler.h +++ b/apps/Gateway/Gateway/Proxy_Handler.h @@ -21,11 +21,12 @@ #include "ace/SOCK_Connector.h" #include "ace/Svc_Handler.h" #include "Event_Forwarding_Discriminator.h" -#include "Dispatch_Set.h" +#include "Consumer_Dispatch_Set.h" #include "Event.h" // Forward declaration. class Proxy_Handler_Connector; +class ACE_Event_Channel; class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> // = TITLE @@ -36,20 +37,15 @@ class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> // Channel from Suppliers and forward them to Consumers. { public: - Proxy_Handler (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); + Proxy_Handler (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); virtual int open (void * = 0); // Initialize and activate a single-threaded Proxy_Handler (called by // ACE_Connector::handle_output()). - int bind (const ACE_INET_Addr &remote_addr, - const ACE_INET_Addr &local_addr, - ACE_INT32); - // Set the peer's addressing and routing information. - ACE_INET_Addr &remote_addr (void); // Returns the peer's routing address. @@ -82,12 +78,12 @@ public: void max_timeout (int); int max_timeout (void); - // = Set/get direction (i.e., 'S' for Supplier and 'C' for Consumer + // = Set/get proxy role (i.e., 'S' for Supplier and 'C' for Consumer // (necessary for error checking). - void direction (char); - char direction (void); + void proxy_role (char); + char proxy_role (void); - // = The total number of bytes sent/received on this channel. + // = The total number of bytes sent/received on this proxy. size_t total_bytes (void); void total_bytes (size_t bytes); // Increment count by <bytes>. @@ -101,22 +97,10 @@ protected: MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. }; - int initialize_connection (void); - // Perform the first-time initiation of a connection to the peer. - - int reinitiate_connection (void); - // Reinitiate a connection asynchronously when peers fail. - - void socket_queue_size (void); - // Set the socket queue size. - virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Perform Proxy_Handler termination. - Event_Forwarding_Discriminator *efd_; - // Maps Events to a set of Consumers. - ACE_INET_Addr remote_addr_; // Address of peer. @@ -127,14 +111,10 @@ protected: // The assigned routing ID of this entry. size_t total_bytes_; - // The total number of bytes sent/received on this channel. + // The total number of bytes sent/received on this proxy. State state_; - // The current state of the channel. - - Proxy_Handler_Connector *connector_; - // Back pointer to Proxy_Handler_Connector to reestablish broken - // connections. + // The current state of the proxy. int timeout_; // Amount of time to wait between reconnection attempts. @@ -142,12 +122,13 @@ protected: int max_timeout_; // Maximum amount of time to wait between reconnection attempts. - char direction_; - // Indicates which direction data flows through the channel ('S' == - // Supplier and 'C' == Consumer). + char proxy_role_; + // Indicates which role the proxy plays ('S' == Supplier and 'C' == + // Consumer). - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). + ACE_Event_Channel &event_channel_; + // Reference to the <ACE_Event_Channel> that we use to forward all + // the events from Consumers and Suppliers. }; class Supplier_Proxy : public Proxy_Handler @@ -158,13 +139,15 @@ class Supplier_Proxy : public Proxy_Handler // Performs framing and error checking. { public: - Supplier_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - // Constructor sets the consumer map pointer. + // = Initialization method. + Supplier_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); protected: + // = All the following methods are upcalls, so they can be protected. + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); // Receive and process peer events. @@ -172,7 +155,8 @@ protected: // Receive an event from a Supplier. int forward (ACE_Message_Block *event); - // Forward the Event to a Consumer. + // Forward the <event> to its appropriate Consumer. This delegates + // to the <ACE_Event_Channel> to do the actual forwarding. ACE_Message_Block *msg_frag_; // Keep track of event fragment to handle non-blocking recv's from @@ -184,19 +168,22 @@ class Consumer_Proxy : public Proxy_Handler // Handles transmission of events to Consumers. // // = DESCRIPTION - // Uses a single-threaded approach. + // Performs queueing and error checking. Uses a single-threaded + // Reactive approach to handle flow control. { public: - Consumer_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager * = 0, - int socket_queue_size = 0); - - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // = Initialization method. + Consumer_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); + + virtual int put (ACE_Message_Block *event, + ACE_Time_Value * = 0); // Send an event to a Consumer (may be queued if necessary). protected: - // = We'll allow up to 16 megabytes to be queued per-output channel. + // = We'll allow up to 16 megabytes to be queued per-output proxy. enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16}; virtual int handle_output (ACE_HANDLE); diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp index 7ac0a77a2d4..dc18eca8500 100644 --- a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp +++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp @@ -18,15 +18,15 @@ Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) // Locate the ACE_Svc_Handler corresponding to the socket descriptor. if (this->handler_map_.find (sd, stp) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", + ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate proxy %d in connector map, %p\n", sd, "find"), -1); - Proxy_Handler *channel = stp->svc_handler (); + Proxy_Handler *proxy_handler = stp->svc_handler (); // Schedule a reconnection request at some point in the future - // (note that channel uses an exponential backoff scheme). - if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, - channel->timeout ()) == -1) + // (note that proxy_handler uses an exponential backoff scheme). + if (ACE_Service_Config::reactor ()->schedule_timer + (proxy_handler, 0, proxy_handler->timeout ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); return 0; @@ -35,36 +35,37 @@ Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) // Initiate (or reinitiate) a connection to the Proxy_Handler. int -Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel, - ACE_Synch_Options &synch_options) +Proxy_Handler_Connector::initiate_connection (Proxy_Handler *proxy_handler, + ACE_Synch_Options &synch_options) { - char buf[MAXHOSTNAMELEN]; + char addr_buf[MAXHOSTNAMELEN]; // Mark ourselves as idle so that the various iterators // will ignore us until we are reconnected. - channel->state (Proxy_Handler::IDLE); + proxy_handler->state (Proxy_Handler::IDLE); - if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 - || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) + // We check the remote addr second so that it remains in the addr_buf. + if (proxy_handler->local_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1 + || proxy_handler->remote_addr ().addr_to_string (addr_buf, sizeof addr_buf) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "can't obtain peer's address"), -1); // Try to connect to the Peer. - if (this->connect (channel, channel->remote_addr (), - synch_options, channel->local_addr ()) == -1) + if (this->connect (proxy_handler, proxy_handler->remote_addr (), + synch_options, proxy_handler->local_addr ()) == -1) { if (errno != EWOULDBLOCK) { - channel->state (Proxy_Handler::FAILED); + proxy_handler->state (Proxy_Handler::FAILED); ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", - "connect", buf)); + "connect", addr_buf)); // Reschedule ourselves to try and connect again. if (synch_options[ACE_Synch_Options::USE_REACTOR]) { if (ACE_Service_Config::reactor ()->schedule_timer - (channel, 0, channel->timeout ()) == 0) + (proxy_handler, 0, proxy_handler->timeout ()) == 0) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_timer"), -1); } @@ -75,18 +76,18 @@ Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel, } else { - channel->state (Proxy_Handler::CONNECTING); + proxy_handler->state (Proxy_Handler::CONNECTING); ACE_DEBUG ((LM_DEBUG, "(%t) in the process of connecting %s to %s\n", synch_options[ACE_Synch_Options::USE_REACTOR] - ? "asynchronously" : "synchronously", buf)); + ? "asynchronously" : "synchronously", addr_buf)); } } else { - channel->state (Proxy_Handler::ESTABLISHED); + proxy_handler->state (Proxy_Handler::ESTABLISHED); ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", - buf, channel->get_handle ())); + addr_buf, proxy_handler->get_handle ())); } return 0; } diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp index 98722a96295..f316e4e82bf 100644 --- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp @@ -1,30 +1,33 @@ -#include "Thr_Proxy_Handler.h" // $Id$ -#include "Proxy_Handler_Connector.h" +#include "Event_Channel.h" +#include "Thr_Proxy_Handler.h" #if defined (ACE_HAS_THREADS) -Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size) +Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Consumer_Proxy (ec, remote_addr, local_addr, conn_id) { } -// This method should be called only when the peer shuts down -// unexpectedly. This method marks the Proxy_Handler as having failed and -// deactivates the ACE_Message_Queue (to wake up the thread blocked on -// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will -// eventually try to reconnect... +// This method should be called only when the Consumer shuts down +// unexpectedly. This method marks the Proxy_Handler as having failed +// and deactivates the ACE_Message_Queue (to wake up the thread +// blocked on <dequeue_head> in svc()). +// Thr_Output_Handler::handle_close () will eventually try to +// reconnect... int Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) { + // Call down to the <Consumer_Proxy> to handle this first. this->Consumer_Proxy::handle_input (h); - ACE_Service_Config::reactor ()->remove_handler (h, - ACE_Event_Handler::RWE_MASK - | ACE_Event_Handler::DONT_CALL); + + ACE_Service_Config::reactor ()->remove_handler + (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); + // Deactivate the queue while we try to get reconnected. this->msg_queue ()->deactivate (); return 0; @@ -36,31 +39,28 @@ Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) int Thr_Consumer_Proxy::open (void *) { - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); + // Register ourselves to receive input events (which indicate that - // the Peer has shut down unexpectedly). - if (ACE_Service_Config::reactor ()->register_handler (this, - ACE_Event_Handler::READ_MASK) == -1) + // the Consumer has shut down unexpectedly). + else if (ACE_Service_Config::reactor ()->register_handler + (this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); - // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) { ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit - // messages to peers. + // events to Consumers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else @@ -70,87 +70,93 @@ Thr_Consumer_Proxy::open (void *) } } -// ACE_Queue up a message for transmission (must not block since all -// Supplier_Proxys are single-threaded). +// Queue up an event for transmission (must not block since +// Supplier_Proxys may be single-threaded). int Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) { // Perform non-blocking enqueue. - return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); } -// Transmit messages to the peer (note simplification resulting from +// Transmit events to the peer (note simplification resulting from // threads...) int Thr_Consumer_Proxy::svc (void) { + for (;;) { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) connected! Thr_Consumer_Proxy's handle = %d\n", this->peer ().get_handle ())); // Since this method runs in its own thread it is OK to block on // output. for (ACE_Message_Block *mb = 0; - this->msg_queue ()->dequeue_head (mb) != -1; ) - if (this->send (mb) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); - - ACE_ASSERT (errno == ESHUTDOWN); - - ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", - this->id (), this->get_handle ())); - - this->peer ().close (); - - for (this->timeout (1); - // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) - { - ACE_Time_Value tv (this->timeout ()); - ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", - tv.sec ())); - ACE_OS::sleep (tv); - } + this->msg_queue ()->dequeue_head (mb) != -1; + ) + { + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); + } + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", + this->id (), this->get_handle ())); + + this->peer ().close (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->event_channel_.initiate_proxy_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + + ACE_OS::sleep (tv); + } } return 0; } -Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd, - Proxy_Handler_Connector *ioc, - ACE_Thread_Manager *thr_mgr, - int socket_queue_size) - : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size) +Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id) + : Supplier_Proxy (ec, remote_addr, local_addr, conn_id) { } int Thr_Supplier_Proxy::open (void *) { - // Set the size of the socket queue. - this->socket_queue_size (); - // Turn off non-blocking I/O. if (this->peer ().disable (ACE_NONBLOCK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); - if (this->initialize_connection ()) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", - "initialize_connection"), -1); + // Call back to the <Event_Channel> to complete our initialization. + else if (this->event_channel_.complete_proxy_connection (this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1); // Reactivate message queue. If it was active then this is the // first time in and we need to spawn a thread, otherwise the queue // was inactive due to some problem and we've already got a thread. - if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) { ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); // Become an active object by spawning a new thread to transmit - // messages to peers. + // events to peers. return this->activate (THR_NEW_LWP | THR_DETACHED); } else @@ -160,7 +166,7 @@ Thr_Supplier_Proxy::open (void *) } } -// Receive messages from a Peer in a separate thread (note reuse of +// Receive events from a Peer in a separate thread (note reuse of // existing code!). int @@ -168,20 +174,20 @@ Thr_Supplier_Proxy::svc (void) { for (;;) { - ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n", + ACE_DEBUG ((LM_DEBUG, + "(%t) connected! Thr_Supplier_Proxy's handle = %d\n", this->peer ().get_handle ())); - // Since this method runs in its own thread and processes - // messages for one connection it is OK to block on input and - // output. + // Since this method runs in its own thread and processes events + // for one connection it is OK to call down to the + // <Supplier_Proxy::handle_input> method, which blocks on input. while (this->handle_input () != -1) continue; ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n", - this->id (), - this->get_handle ())); + this->id (), this->get_handle ())); this->peer ().close (); @@ -190,11 +196,12 @@ Thr_Supplier_Proxy::svc (void) for (this->timeout (1); // Default is to reconnect synchronously. - this->connector_->initiate_connection (this) == -1; ) + this->event_channel_.initiate_proxy_connection (this) == -1; ) { ACE_Time_Value tv (this->timeout ()); ACE_ERROR ((LM_ERROR, - "(%t) reattempting connection, sec = %d\n", tv.sec ())); + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); ACE_OS::sleep (tv); } } diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h index 8ecced3805c..275bc87b320 100644 --- a/apps/Gateway/Gateway/Thr_Proxy_Handler.h +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.h @@ -25,21 +25,22 @@ class Thr_Consumer_Proxy : public Consumer_Proxy // Runs each Output Proxy_Handler in a separate thread. { public: - Thr_Consumer_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); + Thr_Consumer_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); virtual int open (void *); // Initialize the threaded Consumer_Proxy object and spawn a new // thread. - virtual int handle_input (ACE_HANDLE); - // Called when Peer shutdown unexpectedly. - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); // Send a message to a peer. +protected: + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + virtual int svc (void); // Transmit peer messages. }; @@ -49,14 +50,15 @@ class Thr_Supplier_Proxy : public Supplier_Proxy // Runs each Input Proxy_Handler in a separate thread. { public: - Thr_Supplier_Proxy (Event_Forwarding_Discriminator *, - Proxy_Handler_Connector *, - ACE_Thread_Manager *, - int socket_queue_size); + Thr_Supplier_Proxy (ACE_Event_Channel &, + const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 conn_id); virtual int open (void *); // Initialize the object and spawn a new thread. +protected: virtual int svc (void); // Transmit peer messages. }; diff --git a/apps/Gateway/Gateway/gatewayd.cpp b/apps/Gateway/Gateway/gatewayd.cpp index 48b6e9a173d..b0af5f7cace 100644 --- a/apps/Gateway/Gateway/gatewayd.cpp +++ b/apps/Gateway/Gateway/gatewayd.cpp @@ -17,18 +17,17 @@ main (int argc, char *argv[]) ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1)); else // Use static binding. { - static char *l_argv[3] = { "-d" }; ACE_Service_Object *so = ACE_SVC_INVOKE (Gateway); - if (so->init (1, l_argv) == -1) + if (so->init (argc - 1, argv + 1) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1)); } } // Run forever, performing the configured services until we are shut - // down by a signal. + // down by a SIGINT/SIGQUIT signal. - ACE_Service_Config::run_reactor_event_loop (); + daemon.run_reactor_event_loop (); return 0; } diff --git a/apps/Gateway/Peer/Event.h b/apps/Gateway/Peer/Event.h index 24881c3e85b..5e288edf910 100644 --- a/apps/Gateway/Peer/Event.h +++ b/apps/Gateway/Peer/Event.h @@ -23,7 +23,7 @@ // Proxy_Handler in the Gateway. typedef ACE_INT32 ACE_INT32; -class Event_Addr +class Event_Key // = TITLE // Address used to identify the source/destination of an event. // @@ -33,14 +33,14 @@ class Event_Addr // Channel from the format of the data. { public: - Event_Addr (ACE_INT32 cid = -1, + Event_Key (ACE_INT32 cid = -1, u_char sid = 0, u_char type = 0) : conn_id_ (cid), supplier_id_ (sid), type_ (type) {} - int operator== (const Event_Addr &event_addr) const + int operator== (const Event_Key &event_addr) const { return this->conn_id_ == event_addr.conn_id_ && this->supplier_id_ == event_addr.supplier_id_ @@ -58,10 +58,13 @@ public: // Event type. }; - class Event_Header // = TITLE - // Fixed sized header. + // Fixed sized header. + // + // = DESCRIPTION + // This is designed to have a sizeof (16) to avoid alignment + // problems on most platforms. { public: typedef ACE_INT32 SUPPLIER_ID; @@ -72,14 +75,35 @@ public: INVALID_ID = -1 // No peer can validly use this number. }; + void decode (void) + { + this->len_ = ntohl (this->len_); + this->supplier_id_ = ntohl (this->supplier_id_); + this->type_ = ntohl (this->type_); + this->priority_ = ntohl (this->priority_); + } + // Decode from network byte order to host byte order. + + void encode (void) + { + this->len_ = htonl (this->len_); + this->supplier_id_ = htonl (this->supplier_id_); + this->type_ = htonl (this->type_); + this->priority_ = htonl (this->priority_); + } + // Encode from host byte order to network byte order. + + size_t len_; + // Length of the data_ payload, in bytes. + SUPPLIER_ID supplier_id_; // Source ID. ACE_INT32 type_; // Event type. - size_t len_; - // Length of the entire event (including data payload) in bytes. + ACE_INT32 priority_; + // Event priority. }; class Event diff --git a/apps/Gateway/Peer/peerd.cpp b/apps/Gateway/Peer/peerd.cpp index 3b7bdb0cb2d..ab59567fc08 100644 --- a/apps/Gateway/Peer/peerd.cpp +++ b/apps/Gateway/Peer/peerd.cpp @@ -17,12 +17,9 @@ main (int argc, char *argv[]) ACE_ERROR ((LM_ERROR, "%p\n%a", "open", 1)); else // Use static binding. { - static char *l_argv[3] = { "-d", "-p", "10002" }; - - ACE_Service_Object *so = _make_Peer_Acceptor (); - + ACE_Service_Object *so = ACE_SVC_INVOKE (Peer_Acceptor); - if (so->init (3, l_argv) == -1) + if (so->init (argc - 1, argv + 1) == -1) ACE_ERROR ((LM_ERROR, "%p\n%a", "init", 1)); } } diff --git a/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp b/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp index 5eca7a7e853..31164061eea 100644 --- a/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp +++ b/apps/Orbix-Examples/Event_Comm/Consumer/Notification_Receiver_Handler.cpp @@ -1,6 +1,6 @@ -#include "Notification_Receiver_Handler.h" // $Id$ +#include "Notification_Receiver_Handler.h" #if defined (ACE_HAS_ORBIX) @@ -108,7 +108,7 @@ Notification_Receiver_Handler::notifier (void) Notification_Receiver_Handler::~Notification_Receiver_Handler (void) { - this->handle_close (-1, ACE_Event_Handler::RWE_MASK); + this->handle_close (-1, ACE_Event_Handler::ALL_EVENTS_MASK); } #endif /* ACE_HAS_ORBIX */ diff --git a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp index ebe56b2ff9c..38e2977140d 100644 --- a/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp +++ b/examples/ASX/Event_Server/Event_Server/Peer_Router.cpp @@ -66,7 +66,7 @@ Peer_Handler<ROUTER, KEY>::svc (void) { #if 0 ACE_Thread_Control thread_control (tm); - // Just a try !! we're just reading from our Message_Queue + ACE_Message_Block *db, *hb; int n; diff --git a/examples/ASX/Message_Queue/bounded_buffer.cpp b/examples/ASX/Message_Queue/bounded_buffer.cpp index 08df53c8b93..194fbf07280 100644 --- a/examples/ASX/Message_Queue/bounded_buffer.cpp +++ b/examples/ASX/Message_Queue/bounded_buffer.cpp @@ -88,7 +88,7 @@ static void *consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) if (length > 0) ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); - delete mb; + mb->release (); if (length == 0) break; diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp index cc2c475bef1..7d0fcef4160 100644 --- a/examples/ASX/Message_Queue/buffer_stream.cpp +++ b/examples/ASX/Message_Queue/buffer_stream.cpp @@ -163,7 +163,7 @@ Consumer::svc (void) if (length > 0) ACE_OS::write (ACE_STDOUT, mb->rd_ptr (), length); - delete mb; + mb->release (); if (length == 0) break; diff --git a/examples/ASX/Message_Queue/priority_buffer.cpp b/examples/ASX/Message_Queue/priority_buffer.cpp index b80c23234c3..2d057fd69c2 100644 --- a/examples/ASX/Message_Queue/priority_buffer.cpp +++ b/examples/ASX/Message_Queue/priority_buffer.cpp @@ -44,7 +44,7 @@ consumer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) // Free up the buffer memory and the Message_Block. ACE_Service_Config::alloc ()->free (mb->rd_ptr ()); - delete mb; + mb->release (); if (length == 0) break; @@ -95,8 +95,8 @@ producer (ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue) mb->msg_priority (rb.size ()); mb->wr_ptr (rb.size ()); - ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n", - mb->msg_priority ())); + ACE_DEBUG ((LM_DEBUG, "enqueueing message of size %d\n", + mb->msg_priority ())); // Enqueue in priority order. if (msg_queue->enqueue_prio (mb) == -1) ACE_ERROR ((LM_ERROR, "(%t) %p\n", "put_next")); diff --git a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp index 23d9f6c7a35..f17560ad0e6 100644 --- a/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp +++ b/examples/ASX/UPIPE_Event_Server/Peer_Router.cpp @@ -140,7 +140,7 @@ Peer_Handler<ROUTER, KEY>::handle_input (ACE_HANDLE h) ACE_DEBUG ((LM_DEBUG, "(%t) input arrived on sd %d\n", h)); // ACE_Service_Config::reactor ()->remove_handler(h, -// ACE_Event_Handler::RWE_MASK +// ACE_Event_Handler::ALL_EVENTS_MASK // |ACE_Event_Handler::DONT_CALL); // this method should be called only if the peer shuts down // so we deactivate our ACE_Message_Queue to awake our svc thread diff --git a/examples/Connection/blocking/SPIPE-connector.h b/examples/Connection/blocking/SPIPE-connector.h index 6a6fc97976f..8dd26a32e20 100644 --- a/examples/Connection/blocking/SPIPE-connector.h +++ b/examples/Connection/blocking/SPIPE-connector.h @@ -26,7 +26,7 @@ public: // = Demultiplexing hooks. virtual int handle_input (ACE_HANDLE); virtual int handle_close (ACE_HANDLE handle = ACE_INVALID_HANDLE, - ACE_Reactor_Mask mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask mask = ACE_Event_Handler::ALL_EVENTS_MASK); virtual ACE_HANDLE get_handle (void) const; diff --git a/examples/Logger/simple-server/Logging_Handler.cpp b/examples/Logger/simple-server/Logging_Handler.cpp index ceb88ac14da..8f6b435089b 100644 --- a/examples/Logger/simple-server/Logging_Handler.cpp +++ b/examples/Logger/simple-server/Logging_Handler.cpp @@ -136,5 +136,5 @@ int Logging_Handler::close (void) { return this->handle_close (ACE_INVALID_HANDLE, - ACE_Event_Handler::RWE_MASK); + ACE_Event_Handler::ALL_EVENTS_MASK); } diff --git a/netsvcs/lib/TS_Clerk_Handler.cpp b/netsvcs/lib/TS_Clerk_Handler.cpp index e0142256ab0..2e84117e592 100644 --- a/netsvcs/lib/TS_Clerk_Handler.cpp +++ b/netsvcs/lib/TS_Clerk_Handler.cpp @@ -74,7 +74,7 @@ public: // Return the handle of the message_fifo_; virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, - ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + ACE_Reactor_Mask = ACE_Event_Handler::ALL_EVENTS_MASK); // Called when object is removed from the ACE_Reactor virtual int handle_input (ACE_HANDLE); |