diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-15 16:38:54 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1996-12-15 16:38:54 +0000 |
commit | b134be83f52912e4e7e3707973a1e24b29d48552 (patch) | |
tree | 5c7b58240f4d2ab28cec729bb1b7d46a5d01f47f | |
parent | 0d7dedcc81b518738ba0a19347394816bd262322 (diff) | |
download | ATCD-b134be83f52912e4e7e3707973a1e24b29d48552.tar.gz |
*** empty log message ***
69 files changed, 2657 insertions, 925 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b index d9f57161a00..dfaa77649f9 100644 --- a/ChangeLog-96b +++ b/ChangeLog-96b @@ -1,3 +1,59 @@ +Sun Dec 15 10:29:20 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> + + * netsvcs/servers/svc.conf: Removed the "lib" prefix for the + netsvcs DLL. This is now added automatically by the + ACE::ldfind() operation. + + * ace/SString.cpp (ACE_CString): Removed the #pragmas for Win32. + They aren't necessary since we should replace the ACE_USHORT16 + cast with a char cast. Thanks to Amos Shapira <amos@dsi.co.il> + for reporting this. + +Sat Dec 14 14:25:38 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * build/SunOS5.5/tests/UPIPE_SAP_Test.cpp (main): Fixed several + minor bugs with UPIPE_SAP_Test.cpp. + + * ace/OS.i (thr_join): Added implementations for Solaris threads + and most versions of POSIX pthreads where ACE_hthread_t and + ACE_thread_t are the same type! + + * ace/OS: Began adding hooks so that we can eventually move away + from the current split between ACE_thread_t and ACE_hthread_t + and unify them via ACE_Thread_ID. + + * ace/{OS,Thread}.h: Changed the interface of thr_getprio() so + that it takes an int & rather than an int *. + + * ace/OS.i (thr_getprio): Fixed a minor bug for Win32 where we + weren't depositing the thread priority into the return value! + + * Makefile: Changed the order in which things are built so that + netsvcs are built right after libACE, followed by the tests. + +Sat Dec 14 11:54:22 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> + + * apps/Gateway/Gateway/Consumer_Map: Change the Consumer_Map class + so that it was no longer templatized. There isn't any point in + doing this since we aren't going to be changing these types for + this application. + + * apps/Gateway/Gateway: Factored out the code for selecting the + concurrency strategy into a separate *.h file called + Concurrency_Strategy.h. + + * apps/Gateway/Gateway: Began revising the Gateway application to + use the new ACE Event Channel. + + * ace/Svc_Handler: Now that we've got put() and svc() with no-op + defaults in class ACE_Task_Base, we don't need them in + ACE_Svc_Handler anymore, so I removed them! + + * ace/Task: Finally got sick of having to provide no-op + open()/put()/close() routines in all ACE_Task subclasses, so I + changed these methods from pure virtual to virtual with default + no-op behavior. Updated all the tests, as well. + Sat Dec 14 11:39:15 1996 David L. Levine <levine@cs.wustl.edu> * ace/{Module,Stream,Svc_Handler,Synch_T,Task_T}.cpp and Synch_T.i: @@ -21,8 +77,6 @@ Fri Dec 13 22:07:11 1996 David L. Levine <levine@cs.wustl.edu> SOLINK step in build of shared objects for SunOS5 with SunC++ with symlink from .so to .o file. -Fri Dec 13 13:44:12 1996 David L. Levine <levine@cs.wustl.edu> - * ace/config-vxworks*.h: added ACE_NEEDS_SYSTIME_H to VxWorks configs because it's needed with inlining @@ -58,6 +112,16 @@ Thu Dec 12 18:51:04 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> * ace/Thread: Added getprio() and setprio() methods to ACE_Thread. +Fri Dec 13 13:44:12 1996 David L. Levine <levine@cs.wustl.edu> + + * ace/config-vxworks*.h: added ACE_NEEDS_SYSTIME_H to VxWorks + configs because it's needed with inlining + + * include/makeinclude/platform_vxworks*.GNU: cleaned up VxWorks + config files + +Thu Dec 12 18:51:04 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + * ace: Added a new macro called ACE_UNUSED_ARG() to keep the compiler from outputting warnings about unused arguments. So far, this is mostly done for Win32, but it @@ -12,11 +12,11 @@ INFO = README \ VERSION DIRS = ace \ + netsvcs \ + tests \ apps \ examples \ - netsvcs \ - performance-tests \ - tests + performance-tests CLONE = Makefile \ ace \ @@ -12,32 +12,32 @@ An Object-Oriented Network Programming Toolkit Overview of ACE The ADAPTIVE Communication Environment (ACE) is an object-oriented -toolkit that implements strategic and tactical design patterns to -simplify the development of concurrent, event-driven communication -software. ACE provides a rich set of reusable C++ wrappers, class -categories, and frameworks that perform common communication software -tasks across a range of operating system platforms. The communication -software tasks provided by ACE include event demultiplexing and event -handler dispatching, connection establishment, interprocess -communication, shared memory management, message routing, dynamic -(re)configuration of distributed services, multi-threading, and -concurrency control. - -ACE is targeted for developers of high-performance concurrent network -applications and services. The primary goal of ACE is to simplify the -development of concurrent OO communication software that utilizes -interprocess communication, event demultiplexing, explicit dynamic -linking, and concurrency. In addition, ACE automates communication -software configuration and reconfiguration by dynamically linking -services into applications at run-time and executing these services on -one or more processes or threads. +(OO) toolkit that implements fundamental design patterns for +communication software. ACE provides a rich set of reusable C++ +wrappers, class categories, and frameworks that perform common +communication software tasks across a range of operating system +platforms. The communication software tasks provided by ACE include +event demultiplexing and event handler dispatching, service +initialization, interprocess communication, shared memory management, +message routing, dynamic (re)configuration of distributed services, +multi-threading, and concurrency control. + +ACE is targeted for developers of high-performance communication +services and applications on UNIX, POSIX, and Win32 platforms. ACE +simplifies the development of OO network applications and services +that utilize interprocess communication, event demultiplexing, +explicit dynamic linking, and concurrency. ACE automates system +configuration and reconfiguration by dynamically linking services into +applications at run-time and executing these services in one or more +processes or threads. ACE has been ported to a wide range of uni-processor and multi-process OS platforms including Win32 (i.e., WinNT and Win95), most versions of UNIX (e.g., SunOS 4.x and 5.x, SGI IRIX, HP-UX, OSF/1, AIX, Linux, and SCO), VxWorks, and MVS OpenEdition. It is currently used in commercial products by dozens of companies including Ericsson, -Bellcore, Siemens, Motorola, and Kodak. +Bellcore, Siemens, Motorola, and Kodak. There are both C++ and Java +versions of ACE available. The remainder of this document outlines the structure and participants of the layers in this diagram. @@ -130,12 +130,12 @@ medical engineering. OBTAINING ACE -The current ACE release is provided as a tar file that is slightly -larger than 1.5 Meg compressed using GNU gzip. ACE may be obtained -electronically from http://www.cs.wustl.edu/~schmidt/ACE-obtain.html. -This release contains contains the source code, test drivers, and -example applications for C++ wrapper libraries and the higher-level -ACE network programming framework developed as part of the ADAPTIVE +The current ACE release is provided as a tar file that is around 1.8 +Meg compressed using GNU gzip. ACE may be obtained electronically +from http://www.cs.wustl.edu/~schmidt/ACE-obtain.html. This release +contains contains the source code, test drivers, and example +applications for C++ wrapper libraries and the higher-level ACE +network programming framework developed as part of the ADAPTIVE project at the University of California, Irvine and at Washington University. diff --git a/ace/OS.cpp b/ace/OS.cpp index 6559f6209aa..e0b395e66a1 100644 --- a/ace/OS.cpp +++ b/ace/OS.cpp @@ -1455,3 +1455,47 @@ ace_mutex_lock_cleanup_adapter (void *args) { ACE_OS::mutex_lock_cleanup (args); } + +ACE_Thread_ID::ACE_Thread_ID (ACE_thread_t thread_id, + ACE_hthread_t thread_handle) + : thread_id_ (thread_id), + thread_handle_ (thread_handle) +{ +} + +ACE_thread_t +ACE_Thread_ID::id (void) +{ + return this->thread_id_; +} + +void +ACE_Thread_ID::id (ACE_thread_t thread_id) +{ + this->thread_id_ = thread_id; +} + +ACE_hthread_t +ACE_Thread_ID::handle (void) +{ + return this->thread_handle_; +} + +void +ACE_Thread_ID::handle (ACE_hthread_t thread_handle) +{ + this->thread_handle_ = thread_handle; +} + +int +ACE_Thread_ID::operator == (const ACE_Thread_ID &rhs) +{ + return this->thread_handle_ == rhs.thread_handle_ + && this->thread_id_ == rhs.thread_id_; +} + +int +ACE_Thread_ID::operator != (const ACE_Thread_ID &rhs) +{ + return !((*this) == rhs); +} @@ -1852,6 +1852,33 @@ typedef int ucontext_t; #undef t_errno #endif /* ACE_HAS_BROKEN_T_ERRNO */ +class ACE_Export ACE_Thread_ID + // = TITLE + // Defines a platform-independent thread ID. +{ +public: + ACE_Thread_ID (ACE_thread_t, ACE_hthread_t); + + // = Set/Get the Thread ID. + ACE_thread_t id (void); + void id (ACE_thread_t); + + // = Set/Get the Thread handle. + ACE_hthread_t handle (void); + void handle (ACE_hthread_t); + + // != Comparison operator. + int operator == (const ACE_Thread_ID &); + int operator != (const ACE_Thread_ID &); + +private: + ACE_thread_t thread_id_; + // Identify the thread. + + ACE_hthread_t thread_handle_; + // Handle to the thread (typically used to "wait" on Win32). +}; + // Type of the extended signal handler. typedef void (*ACE_Sig_Handler_Ex) (int, siginfo_t *siginfo, ucontext_t *ucontext); @@ -2320,7 +2347,26 @@ public: static int t_sync (ACE_HANDLE fildes); static int t_unbind (ACE_HANDLE fildes); - // = A set of wrappers for threads. +#if 0 + // = A set of wrappers for threads (these are portable since they use the ACE_Thread_ID). + static int thr_continue (const ACE_Thread_ID &thread); + static int thr_create (ACE_THR_FUNC, + void *args, + long flags, + ACE_Thread_ID *, + u_int priority = 0, + void *stack = 0, + size_t stacksize = 0); + static int thr_getprio (ACE_Thread_ID thr_id, int &prio, int *policy = 0); + static int thr_join (ACE_Thread_ID waiter_id, void **status); + static int thr_kill (ACE_Thread_ID thr_id, int signum); + static ACE_Thread_ID thr_self (void); + static int thr_setprio (ACE_Thread_ID thr_id, int prio); + static int thr_suspend (ACE_Thread_ID target_thread); + static int thr_cancel (ACE_Thread_ID t_id); +#endif /* 0 */ + + // = A set of wrappers for threads (these are non-portable since they use ACE_thread_t and ACE_hthread_t and will go away in a future release). static int thr_continue (ACE_hthread_t target_thread); static int thr_create (ACE_THR_FUNC, void *args, @@ -2330,30 +2376,31 @@ public: u_int priority = 0, void *stack = 0, size_t stacksize = 0); + static int thr_getprio (ACE_hthread_t thr_id, int &prio); + static int thr_join (ACE_hthread_t waiter_id, void **status); + static int thr_join (ACE_thread_t waiter_id, ACE_thread_t *thr_id, void **status); + static int thr_kill (ACE_thread_t thr_id, int signum); + static ACE_thread_t thr_self (void); + static void thr_self (ACE_hthread_t &); + static int thr_setprio (ACE_hthread_t thr_id, int prio); + static int thr_suspend (ACE_hthread_t target_thread); + static int thr_cancel (ACE_thread_t t_id); + static int thr_cmp (ACE_hthread_t t1, ACE_hthread_t t2); static int thr_equal (ACE_thread_t t1, ACE_thread_t t2); static void thr_exit (void *status = 0); static int thr_getconcurrency (void); - static int thr_getprio (ACE_hthread_t thr_id, int *prio); static int thr_getspecific (ACE_thread_key_t key, void **data); - static int thr_join (ACE_hthread_t waiter_id, void **status); - static int thr_join (ACE_thread_t waiter_id, ACE_thread_t *thr_id, void **status); static int thr_keyfree (ACE_thread_key_t key); static int thr_key_detach (void *inst); static int thr_keycreate (ACE_thread_key_t *key, ACE_THR_DEST, void *inst = 0); static int thr_key_used (ACE_thread_key_t key); - static int thr_kill (ACE_thread_t thr_id, int signum); static size_t thr_min_stack (void); - static ACE_thread_t thr_self (void); - static void thr_self (ACE_hthread_t &); static int thr_setconcurrency (int hint); - static int thr_setprio (ACE_hthread_t thr_id, int prio); static int thr_setspecific (ACE_thread_key_t key, void *data); static int thr_sigsetmask (int how, const sigset_t *nsm, sigset_t *osm); - static int thr_suspend (ACE_hthread_t target_thread); static int thr_setcancelstate (int new_state, int *old_state); static int thr_setcanceltype (int new_type, int *old_type); - static int thr_cancel (ACE_thread_t t_id); static int sigwait (sigset_t *set, int *sig = 0); static void thr_testcancel (void); static void thr_yield (void); @@ -3069,20 +3069,26 @@ ACE_OS::thr_getconcurrency (void) } ACE_INLINE int -ACE_OS::thr_getprio (ACE_hthread_t thr_id, int *prio) +ACE_OS::thr_getprio (ACE_hthread_t thr_id, int &prio) { // ACE_TRACE ("ACE_OS::thr_getprio"); #if defined (ACE_HAS_THREADS) #if defined (ACE_HAS_STHREADS) - ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_getprio (thr_id, prio), ace_result_), int, -1); -#elif defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) - ACE_NOTSUP_RETURN (-1); + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_getprio (thr_id, &prio), ace_result_), int, -1); +#elif (defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)) && !defined (ACE_LACKS_SETSCHED) + struct sched_param param; + int result; + int policy = 0; + + ACE_OSCALL (ACE_ADAPT_RETVAL (::pthread_getschedparam (thr_id, &policy, ¶m), result), int, -1, result); + prio = param.sched_priority; + return result; #elif defined (ACE_HAS_WTHREADS) ACE_UNUSED_ARG(prio); // why is the thread prio not dropped into prio ? - int result = ::GetThreadPriority (thr_id); - return result == THREAD_PRIORITY_ERROR_RETURN ? -1 : result; + prio = ::GetThreadPriority (thr_id); + return prio == THREAD_PRIORITY_ERROR_RETURN ? -1 : 0; #elif defined (VXWORKS) ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::taskPriorityGet (thr_id, prio), ace_result_), int, -1); #endif /* ACE_HAS_STHREADS */ @@ -3159,9 +3165,11 @@ ACE_OS::thr_join (ACE_hthread_t thr_handle, void **status) status = status; #if defined (ACE_HAS_THREADS) #if defined (ACE_HAS_STHREADS) - ACE_NOTSUP_RETURN (-1); -#elif defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) - ACE_NOTSUP_RETURN (-1); + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_join (thr_handle, 0, status), ace_result_), + int, -1); +#elif (defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)) && !defined (ACE_HAS_THREAD_SELF) + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::pthread_join (thr_handle, status), ace_result_), + int, -1); #elif defined (ACE_HAS_WTHREADS) void *local_status = 0; @@ -3259,12 +3267,12 @@ ACE_OS::thr_setcanceltype (int new_type, int *old_type) } ACE_INLINE int -ACE_OS::thr_cancel (ACE_thread_t t_id) +ACE_OS::thr_cancel (ACE_thread_t thr_id) { // ACE_TRACE ("ACE_OS::thr_cancel"); #if defined (ACE_HAS_THREADS) #if defined (ACE_HAS_DCETHREADS) || (defined (ACE_HAS_PTHREADS) && defined (ACE_HAS_STHREADS)) - ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::pthread_cancel(t_id), + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::pthread_cancel (thr_id), ace_result_), int, -1); #elif defined (ACE_HAS_PTHREADS) @@ -3275,7 +3283,7 @@ ACE_OS::thr_cancel (ACE_thread_t t_id) #elif defined (ACE_HAS_STHREADS) ACE_NOTSUP_RETURN (-1); #elif defined (ACE_HAS_WTHREADS) - ACE_UNUSED_ARG(t_id); + ACE_UNUSED_ARG(thr_id); ACE_NOTSUP_RETURN (-1); #elif defined (VXWORKS) @@ -3554,7 +3562,13 @@ ACE_OS::thr_setprio (ACE_hthread_t thr_id, int prio) ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_setprio (thr_id, prio), ace_result_), int, -1); -#elif defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) +#elif (defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)) && !defined (ACE_LACKS_SETSCHED) + struct sched_param param; + int result; + + ACE_OSCALL (ACE_ADAPT_RETVAL (::pthread_setschedparam (thr_id, policy, ¶m), result), int, -1, result); + prio = param.sched_priority; + return result; ACE_NOTSUP_RETURN (-1); #elif defined (ACE_HAS_WTHREADS) ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::SetThreadPriority (thr_id, prio), @@ -5842,3 +5856,91 @@ ACE_OS::mkdir (const wchar_t *path, mode_t mode) #endif /* ACE_WIN32 */ #endif /* ACE_HAS_UNICODE */ + +#if 0 +ACE_INLINE int +ACE_OS::thr_continue (const ACE_Thread_ID &thr_id) +{ + // ACE_TRACE ("ACE_OS::thr_continue"); + return ACE_OS::thr_continue (thr_id.id ()); +} + +ACE_INLINE int +ACE_OS::thr_create (ACE_THR_FUNC func, + void *args, + long flags, + ACE_Thread_ID *thr_id = 0, + u_int priority = 0, + void *stack = 0, + size_t stacksize = 0); +{ + // ACE_TRACE ("ACE_OS::thr_create"); + ACE_thread_t thread_id; + ACE_hthread_t thread_handle; + + int result = ACE_OS::thr_create (func, args, flags, + &thread_id, &thread_handle, + priority, stack, stacksize); + if (result == -1) + return -1; + else if (thr_id != 0) + { + thr_id->id (thread_id); + thr_id->handle (thread_handle); + return result; + } +} + +ACE_INLINE int +ACE_OS::thr_getprio (const ACE_Thread_ID &thr_id, int &prio) +{ + // ACE_TRACE ("ACE_OS::thr_getprio"); + return ACE_OS::thr_getprio (thr_id.handle (), prio); +} + +ACE_INLINE int +ACE_OS::thr_join (const ACE_Thread_ID &thr_id, void **status) +{ +#if defined (ACE_WIN32) + return ACE_OS::join (thr_id.id (), status); +#else + return ACE_OS::join (thr_id.handle (), status); +#endif /* ACE_WIN32 */ +} + +ACE_INLINE int +ACE_OS::thr_cancel (const ACE_Thread_ID &thr_id) +{ + return ACE_OS::thr_cancel (thr_id.id ()); +} + +ACE_INLINE int +ACE_OS::thr_kill (const ACE_Thread_ID &thr_id, int signum) +{ + return ACE_OS::thr_kill (thr_id.id (), signum); +} + +ACE_INLINE ACE_Thread_ID +ACE_OS::thr_self (void) +{ + ACE_hthread_t thr_handle; + ACE_OS::thr_self (thr_handle); + ACE_thread_t thr_id = ACE_OS::thr_self (); + + return ACE_Thread_ID (thr_id, thr_handle); +} + +ACE_INLINE int +ACE_OS::thr_setprio (const ACE_Thread_ID &thr_id, int prio) +{ + // ACE_TRACE ("ACE_OS::thr_getprio"); + return ACE_OS::thr_setprio (thr_id.handle (), prio); +} + +ACE_INLINE int +ACE_OS::thr_suspend (const ACE_Thread_ID &thr_id) +{ + return ACE_OS::thr_suspend (thr_id.handle ()); +} + +#endif /* 0 */ diff --git a/ace/SString.cpp b/ace/SString.cpp index a8269f6362e..47c52efcf68 100644 --- a/ace/SString.cpp +++ b/ace/SString.cpp @@ -108,10 +108,7 @@ ACE_CString::ACE_CString (const ACE_USHORT16 *s, ACE_Allocator *alloc) // Copy the ACE_USHORT16 * string byte-by-byte into the char * // string. for (size_t i = 0; i < this->len_; i++) -#pragma warning(disable: 4244) /* Possible loss of data */ - this->rep_[i] = (ACE_USHORT16) s[i]; -#pragma warning(default: 4244) /* Possible loss of data */ - + this->rep_[i] = char (s[i]); this->rep_[this->len_] = '\0'; } } diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp index 4285901216f..922b6b89eaa 100644 --- a/ace/Svc_Handler.cpp +++ b/ace/Svc_Handler.cpp @@ -217,21 +217,6 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::close (unsigned long) } template <PR_ST_1, ACE_SYNCH_1> int -ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::svc (void) -{ - ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::svc"); - return -1; -} - -template <PR_ST_1, ACE_SYNCH_1> int -ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::put (ACE_Message_Block *, - ACE_Time_Value *) -{ - ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::put"); - return -1; -} - -template <PR_ST_1, ACE_SYNCH_1> int ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::init (int, char *[]) { ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::init"); diff --git a/ace/Svc_Handler.h b/ace/Svc_Handler.h index b66aa783ebd..1228ad0964a 100644 --- a/ace/Svc_Handler.h +++ b/ace/Svc_Handler.h @@ -97,9 +97,6 @@ public: // Returns the underlying PEER_STREAM (used by // ACE_Acceptor::accept() and ACE_Connector::connect() factories). - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); - // Provide a default implementation to simplify ancestors... - virtual void destroy (void); // Call this instead of <delete> to free up dynamically allocated // <Svc_Handler>. This method knows whether or not the object was @@ -119,9 +116,6 @@ public: // This really should be private so that users are forced to call // destroy(). - virtual int svc (void); - // Provide a default implementation to simplify ancestors... - private: void shutdown (void); // Close down the descriptor diff --git a/ace/Task.h b/ace/Task.h index 0b11f6ead24..8e6f4752f27 100644 --- a/ace/Task.h +++ b/ace/Task.h @@ -57,11 +57,11 @@ public: ACE_Task_Base (ACE_Thread_Manager *); // = Initialization and termination hooks (note that these *must* be defined by subclasses). - virtual int open (void *args = 0) = 0; + virtual int open (void *args = 0); // Hook called to open a Task. <args> can be used to pass arbitrary // information into <open>. - virtual int close (u_long flags = 0) = 0; + virtual int close (u_long flags = 0); // Hook called from ACE_Task_Exit when during thread exit and from // the default implemenation of module_closed(). @@ -75,7 +75,7 @@ public: // ACE_Task instance exits. // = Immediate and deferred processing methods, respectively. - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) = 0; + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); // Transfer msg into the queue to handle immediate processing. virtual int svc (void); diff --git a/ace/Task.i b/ace/Task.i index bf6d310d2fc..7e682513761 100644 --- a/ace/Task.i +++ b/ace/Task.i @@ -60,3 +60,30 @@ ACE_Task_Base::svc (void) ACE_TRACE ("ACE_Task_Base::svc"); return 0; } + +// Default ACE_Task open routine + +ACE_INLINE int +ACE_Task_Base::open (void *) +{ + ACE_TRACE ("ACE_Task_Base::open"); + return 0; +} + +// Default ACE_Task close routine + +ACE_INLINE int +ACE_Task_Base::close (u_long) +{ + ACE_TRACE ("ACE_Task_Base::close"); + return 0; +} + +// Default ACE_Task put routine. + +ACE_INLINE int +ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *) +{ + ACE_TRACE ("ACE_Task_Base::put"); + return 0; +} diff --git a/ace/Thread.i b/ace/Thread.i index 6696a08e3f4..1a5e2746f65 100644 --- a/ace/Thread.i +++ b/ace/Thread.i @@ -244,7 +244,7 @@ ACE_Thread::self (ACE_hthread_t &t_id) } ACE_INLINE int -ACE_Thread::getprio (ACE_hthread_t t_id, int *prio) +ACE_Thread::getprio (ACE_hthread_t t_id, int &prio) { ACE_TRACE ("ACE_Thread::getprio"); return ACE_OS::thr_getprio (t_id, prio); diff --git a/ace/config-aix-3.2.5.h b/ace/config-aix-3.2.5.h index cb5bf928a89..eb60aa501ea 100644 --- a/ace/config-aix-3.2.5.h +++ b/ace/config-aix-3.2.5.h @@ -11,23 +11,14 @@ #define ACE_HAS_ONLY_TWO_PARAMS_FOR_ASCTIME_R_AND_CTIME_R #define ACE_HAS_RECURSIVE_THR_EXIT_SEMANTICS #define ACE_HAS_CONSISTENT_SIGNAL_PROTOTYPES - #define ACE_HAS_TEMPLATE_TYPEDEFS - #define ACE_HAS_STRERROR - #define ACE_HAS_SIG_ATOMIC_T - #define ACE_HAS_SSIZE_T - #define ACE_HAS_CPLUSPLUS_HEADERS - #define ACE_HAS_POLL - #define ACE_HAS_POSIX_NONBLOCK - #define ACE_HAS_AIX_GETTIMEOFDAY - #define ACE_HAS_NO_SYSCALL_H // Compiler/platform has the getrusage() system call. diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h new file mode 100644 index 00000000000..e2fbc934c93 --- /dev/null +++ b/apps/Gateway/Gateway/Concurrency_Strategies.h @@ -0,0 +1,74 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Concurrency_strategies.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONCURRENCY_STRATEGIES) +#define _CONCURRENCY_STRATEGIES + +#include "ace/Synch.h" + +// The following typedefs are used in order to parameterize the +// synchronization policies without changing the source code! + +// If we don't have threads then use the single-threaded synchronization. +#if !defined (ACE_HAS_THREADS) +#define SYNCH_STRATEGY ACE_NULL_SYNCH +typedef ACE_Null_Mutex MAP_MUTEX; +#else /* ACE_HAS_THREADS */ + +// Note that we only need to make the ACE_Task thread-safe if we are +// using the multi-threaded Thr_Consumer_Handler... +#if defined (USE_OUTPUT_MT) +#define SYNCH_STRATEGY ACE_MT_SYNCH +#else +#define SYNCH_STRATEGY ACE_NULL_SYNCH +#endif /* USE_OUTPUT_MT || USE_INPUT_MT */ + +// Note that we only need to make the ACE_Map_Manager thread-safe if +// we are using the multi-threaded Thr_Supplier_Handler. In this +// case, we use an RW_Mutex since we'll lookup Consumers far more +// often than we'll update them. +#if defined (USE_INPUT_MT) +typedef ACE_RW_Mutex MAP_MUTEX; +#else +typedef ACE_Null_Mutex MAP_MUTEX; +#endif /* USE_INPUT_MT */ +#endif /* ACE_HAS_THREADS */ + +// = Forward decls +class Thr_Consumer_Handler; +class Thr_Supplier_Handler; +class Consumer_Handler; +class Supplier_Handler; + +#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) +#if defined (USE_OUTPUT_MT) +typedef Thr_Consumer_Handler CONSUMER_HANDLER; +#else +typedef Consumer_Handler CONSUMER_HANDLER; +#endif /* USE_OUTPUT_MT */ + +#if defined (USE_INPUT_MT) +typedef Thr_Supplier_Handler SUPPLIER_HANDLER; +#else +typedef Supplier_Handler SUPPLIER_HANDLER; +#endif /* USE_INPUT_MT */ +#else +// Instantiate a non-multi-threaded Gateway. +typedef Supplier_Handler SUPPLIER_HANDLER; +typedef Consumer_Handler CONSUMER_HANDLER; +#endif /* ACE_HAS_THREADS */ + +#endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index f2466544e5b..4c2648addf0 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -8,25 +8,23 @@ typedef FP::Return_Type FP_RETURN_TYPE; FP_RETURN_TYPE -RT_Config_File_Parser::read_entry (RT_Config_File_Entry &entry, - int &line_number) +Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, + int &line_number) { FP_RETURN_TYPE read_result; - // increment the line count + + // Increment the line count. line_number++; - // Ignore comments, check for EOF and EOLINE - // if this succeeds, we have our connection id + // Ignore comments, check for EOF and EOLINE if this succeeds, we + // have our connection id. while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS) { if (read_result == FP::EOFILE) return FP::EOFILE; else if (read_result == FP::EOLINE || read_result == FP::COMMENT) - { - // increment the line count - line_number++; - } + line_number++; } // Get the logic id. @@ -51,8 +49,8 @@ RT_Config_File_Parser::read_entry (RT_Config_File_Entry &entry, } FP_RETURN_TYPE -CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, - int &line_number) +Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry, + int &line_number) { char buf[BUFSIZ]; FP_RETURN_TYPE read_result; @@ -67,10 +65,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, return FP::EOFILE; else if (read_result == FP::EOLINE || read_result == FP::COMMENT) - { - // increment the line count - line_number++; - } + line_number++; } // get the hostname @@ -83,7 +78,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.remote_port_ = (u_short) port; + entry.remote_poconsumer_ = (u_short) port; // Get the direction. if ((read_result = this->getword (buf)) != FP::SUCCESS) @@ -99,7 +94,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry, if ((read_result = this->getint (port)) != FP::SUCCESS) return read_result; else - entry.local_port_ = (u_short) port; + entry.local_poconsumer_ = (u_short) port; return FP::SUCCESS; } @@ -113,8 +108,8 @@ int main (int argc, char *argv[]) exit (1); } FP_RETURN_TYPE result; - CC_Config_File_Entry CCentry; - CC_Config_File_Parser CCfile; + Connection_Config_File_Entry CCentry; + Connection_Config_File_Parser CCfile; CCfile.open (argv[1]); @@ -130,13 +125,13 @@ int main (int argc, char *argv[]) cerr << "Error at line " << line_number << endl; else printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n", - CCentry.conn_id_, CCentry.host_, CCentry.remote_port_, CCentry.direction_, - CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_port_); + CCentry.conn_id_, CCentry.host_, CCentry.remote_poconsumer_, CCentry.direction_, + CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_poconsumer_); } CCfile.close(); - RT_Config_File_Entry RTentry; - RT_Config_File_Parser RTfile; + Consumer_Config_File_Entry RTentry; + Consumer_Config_File_Parser RTfile; RTfile.open (argv[2]); @@ -165,6 +160,6 @@ int main (int argc, char *argv[]) #endif /* DEBUGGING */ #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) -template class File_Parser<CC_Config_File_Entry>; -template class File_Parser<RT_Config_File_Entry>; +template class File_Parser<Connection_Config_File_Entry>; +template class File_Parser<Consumer_Config_File_Entry>; #endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 9418815ff09..145c3233bae 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -21,42 +20,42 @@ #include "ace/OS.h" #include "File_Parser.h" -class CC_Config_File_Entry +class Connection_Config_File_Entry // = TITLE - // Stores the information in a Channel Connection entry. + // Stores the IO_Handler entry for connection configuration. { public: int conn_id_; - // Connection id for this Channel. + // Connection id for this IO_Handler. char host_[BUFSIZ]; // Host to connect with. - u_short remote_port_; + u_short remote_poconsumer_; // Port to connect with. char direction_; - // 'I' (input) or 'O' (output) + // 'S' (supplier) or 'C' (consumer). int max_retry_delay_; // Maximum amount of time to wait for reconnecting. - u_short local_port_; + u_short local_poconsumer_; // Our local port number. }; -class CC_Config_File_Parser : public File_Parser<CC_Config_File_Entry> +class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry> // = TITLE - // Parser for the Channel Connection file. + // Parser for the IO_Handler Connection file. { public: virtual FP::Return_Type - read_entry (CC_Config_File_Entry &entry, int &line_number); + read_entry (Connection_Config_File_Entry &entry, int &line_number); }; -class RT_Config_File_Entry +class Consumer_Config_File_Entry // = TITLE - // Stores the information in a Routing Table entry. + // Stores the information in a Consumer Map entry. { public: enum { @@ -79,13 +78,13 @@ public: // Total number of these destinations. }; -class RT_Config_File_Parser : public File_Parser<RT_Config_File_Entry> +class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry> // = TITLE - // Parser for the Routing Table file. + // Parser for the Consumer Map file. { public: virtual FP::Return_Type - read_entry (RT_Config_File_Entry &entry, int &line_number); + read_entry (Consumer_Config_File_Entry &entry, int &line_number); }; #endif /* _CONFIG_FILES */ diff --git a/apps/Gateway/Gateway/Consumer_Entry.cpp b/apps/Gateway/Gateway/Consumer_Entry.cpp new file mode 100644 index 00000000000..c3dcd96ebbf --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Entry.cpp @@ -0,0 +1,31 @@ +// Defines an entry in the Consumer Map. +// $Id$ + +#include "Consumer_Entry.h" + +Consumer_Entry::Consumer_Entry (void) +{ + ACE_NEW (this->destinations_, Consumer_Entry::ENTRY_SET); +} + +Consumer_Entry::~Consumer_Entry (void) +{ + delete this->destinations_; +} + +// Get the associated set of destinations. + +Consumer_Entry::ENTRY_SET * +Consumer_Entry::destinations (void) +{ + return this->destinations_; +} + +// Set the associated set of destinations. + +void +Consumer_Entry::destinations (Consumer_Entry::ENTRY_SET *s) +{ + this->destinations_ = s; +} + diff --git a/apps/Gateway/Gateway/Consumer_Entry.h b/apps/Gateway/Gateway/Consumer_Entry.h new file mode 100644 index 00000000000..fe502991514 --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Entry.h @@ -0,0 +1,45 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Entry.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_ROUTING_ENTRY) +#define _ROUTING_ENTRY + +#include "ace/Set.h" + +// Forward reference. +class IO_Handler; + +class Consumer_Entry +{ + // = TITLE + // Defines an entry in the Consumer_Map. +public: + Consumer_Entry (void); + ~Consumer_Entry (void); + + typedef ACE_Unbounded_Set<IO_Handler *> ENTRY_SET; + typedef ACE_Unbounded_Set_Iterator<IO_Handler *> ENTRY_ITERATOR; + + // = Set/get the associated set of destinations. + ENTRY_SET *destinations (void); + void destinations (ENTRY_SET *); + +protected: + ENTRY_SET *destinations_; + // The set of destinations; +}; + +#endif /* _ROUTING_ENTRY */ diff --git a/apps/Gateway/Gateway/Consumer_Map.cpp b/apps/Gateway/Gateway/Consumer_Map.cpp new file mode 100644 index 00000000000..6d16601f949 --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Map.cpp @@ -0,0 +1,61 @@ +/* -*- C++ -*- */ +// $Id$ + +#if !defined (_CONSUMER_MAP_C) +#define _CONSUMER_MAP_C + +#include "Consumer_Map.h" + +// Bind the Event_Addr to the INT_ID. + +int +Consumer_Map::bind (Event_Addr event_addr, + Consumer_Entry *Consumer_Entry) +{ + return this->map_.bind (event_addr, Consumer_Entry); +} + +// Find the Consumer_Entry corresponding to the Event_Addr. + +int +Consumer_Map::find (Event_Addr event_addr, + Consumer_Entry *&Consumer_Entry) +{ + return this->map_.find (event_addr, Consumer_Entry); +} + +// Unbind (remove) the Event_Addr from the map. + +int +Consumer_Map::unbind (Event_Addr event_addr) +{ + return this->map_.unbind (event_addr); +} + +Consumer_Map_Iterator::Consumer_Map_Iterator (Consumer_Map &rt) + : map_iter_ (rt.map_) +{ +} + +int +Consumer_Map_Iterator::next (Consumer_Entry *&ss) +{ + // Loop in order to skip over inactive entries if necessary. + + for (ACE_Map_Entry<Event_Addr, Consumer_Entry *> *temp = 0; + this->map_iter_.next (temp) != 0; + this->advance ()) + { + // Otherwise, return the next item. + ss = temp->int_id_; + return 1; + } + return 0; +} + +int +Consumer_Map_Iterator::advance (void) +{ + return this->map_iter_.advance (); +} +#endif /* _CONSUMER_MAP_C */ diff --git a/apps/Gateway/Gateway/Consumer_Map.h b/apps/Gateway/Gateway/Consumer_Map.h new file mode 100644 index 00000000000..fd392afaf6e --- /dev/null +++ b/apps/Gateway/Gateway/Consumer_Map.h @@ -0,0 +1,62 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Consumer_Map.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONSUMER_MAP_H) +#define _CONSUMER_MAP_H + +#include "ace/Map_Manager.h" +#include "Concurrency_Strategies.h" +#include "Event.h" +#include "Consumer_Entry.h" + +class Consumer_Map +{ + // = TITLE + // Define a generic consumer map based on the ACE Map_Manager. + // + // = DESCRIPTION + // This class makes it easier to use the Map_Manager. +public: + int bind (Event_Addr event, Consumer_Entry *Consumer_Entry); + // Associate Event with the Consumer_Entry. + + int find (Event_Addr event, Consumer_Entry *&Consumer_Entry); + // Break any association of EXID. + + int unbind (Event_Addr event); + // Locate EXID and pass out parameter via INID. If found, + // return 0, else -1. + +public: + ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_; + // Map that associates Event Addrs (external ids) with Consumer_Entry *'s + // <internal IDs>. +}; + +class Consumer_Map_Iterator +{ + // = TITLE + // Define an iterator for the Consumer Map. +public: + Consumer_Map_Iterator (Consumer_Map &mm); + int next (Consumer_Entry *&); + int advance (void); + +private: + ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_iter_; + // Map we are iterating over. +}; +#endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h new file mode 100644 index 00000000000..a8a9374be3c --- /dev/null +++ b/apps/Gateway/Gateway/Event.h @@ -0,0 +1,86 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (EVENT) +#define EVENT + +// This is the unique connection identifier that denotes a particular +// IO_Handler in the Gateway. +typedef short CONN_ID; + +class Event_Addr + // = TITLE + // Address used to identify the source/destination of an event. +{ +public: + Event_Addr (CONN_ID cid = -1, unsigned char lid = 0, unsigned char pay = 0) + : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {} + + int operator== (const Event_Addr &pa) const + { + return this->conn_id_ == pa.conn_id_ + && this->logical_id_ == pa.logical_id_ + && this->payload_ == pa.payload_; + } + + CONN_ID conn_id_; + // Unique connection identifier that denotes a particular IO_Handler. + + unsigned char logical_id_; + // Logical ID. + + unsigned char payload_; + // Payload type. +}; + + +class Event_Header + // = TITLE + // Fixed sized header. +{ +public: + typedef unsigned short SUPPLIER_ID; + // Type used to route messages from gatewayd. + + enum + { + INVALID_ID = -1 // No peer can validly use this number. + }; + + SUPPLIER_ID routing_id_; + // Source ID. + + size_t len_; + // Length of the message in bytes. +}; + +class Event + // = TITLE + // Variable-sized message (buf_ may be variable-sized between + // 0 and MAX_PAYLOAD_SIZE). +{ +public: + enum { MAX_PAYLOAD_SIZE = 1024 }; + // The maximum size of an Event. + + Event_Header header_; + // Message header. + + char buf_[MAX_PAYLOAD_SIZE]; + // Message payload. +}; + +#endif /* EVENT */ diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h new file mode 100644 index 00000000000..47dd8572012 --- /dev/null +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -0,0 +1,99 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event_Channel.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_EVENT_CHANNEL) +#define ACE_EVENT_CHANNEL + +#include "IO_Handler_Connector.h" + +template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER> +class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler + // = TITLE + // Define a generic Event_Channel. +{ +public: + // = Initialization and termination methods. + ACE_Event_Channel (void); + + int open (int argc, char *argv[]); + // Initialize the Channel. + + int close (void); + // Close down the Channel. + +private: + int parse_args (int argc, char *argv[]); + // Parse the command-line arguments. + + int parse_connection_config_file (void); + // Parse the connection configuration file. + + int parse_consumer_config_file (void); + // Parse the consumer map configuration file. + + int initiate_connections (void); + // Initiate connections to the peers. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based performance profiling. + + const char *connection_config_file_; + // Name of the connection configuration file. + + const char *consumer_config_file_; + // Name of the consumer map configuration file. + + int active_connector_role_; + // Enabled if we are playing the role of the active Connector. + + int performance_window_; + // Number of seconds after connection establishment to report + // throughput. + + int blocking_semantics_; + // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. + + int debug_; + // Are we debugging? + + IO_Handler_Connector *connector_; + // This is used to establish the connections actively. + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). + + // = Make life easier by defining typedefs. + typedef ACE_Map_Manager<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP; + typedef ACE_Map_Iterator<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; + typedef ACE_Map_Entry<CONN_ID, IO_Handler *> CONNECTION_MAP_ENTRY; + + CONNECTION_MAP connection_map_; + // Table that maps Connection IDs to IO_Handler *'s. + + Consumer_Map consumer_map_; + // Map that associates event addresses to a set of Consumer_Handler + // *'s. +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Event_Channel.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Event_Channel.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_EVENT_CHANNEL */ diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index e36ae71ca94..776d1b2f338 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -38,8 +37,8 @@ public: template <class ENTRY> class File_Parser // = TITLE - // Class used to parse the configuration file for the routing - // table. + // Class used to parse the configuration file for the Consumer + // Map. { public: // = Open and Close the file specified diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index f249eb2f37d..82666406070 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -1,315 +1,87 @@ /* -*- C++ -*- */ // $Id$ - - -#include "ace/Get_Opt.h" #include "ace/Service_Config.h" -#include "Config_Files.h" +#include "Event_Channel.h" #include "Gateway.h" -#include "Channel_Connector.h" -template <class INPUT_CHANNEL, class OUTPUT_CHANNEL> class Gateway : public ACE_Service_Object { public: - Gateway (ACE_Thread_Manager * = 0); + // = Initialization method. + Gateway (void); + // = Service configurator hooks. virtual int init (int argc, char *argv[]); // Perform initialization. virtual int fini (void); // Perform termination. + virtual int info (char **, size_t) const; + // Return info about this service. + protected: int handle_input (ACE_HANDLE); + // Shut down the Gateway when input comes in from the controlling + // console. int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); - - typedef ACE_Map_Manager<CONN_ID, Channel *, MUTEX> CONFIG_TABLE; - typedef ACE_Map_Iterator<CONN_ID, Channel *, MUTEX> CONFIG_ITERATOR; - - CONFIG_TABLE config_table_; - // Table that maps Connection IDs to Channel *'s. - - ROUTING_TABLE routing_table_; - // Table that maps Peer addresses to a set of Channel *'s for output. - - virtual int info (char **, size_t) const; - // Return info about this service. + // Shut down the Gateway when a signal arrives. int parse_args (int argc, char *argv[]); - // Parse gateway configuration arguments obtained from svc.conf file. - - int parse_cc_config_file (void); - // Parse the channel connection configuration file. - - int parse_rt_config_file (void); - // Parse the routing table configuration file. - - int initiate_connections (void); - // Initiate connections to the peers. - - virtual int handle_timeout (const ACE_Time_Value &, const void *arg); - // Perform timer-based performance profiling. - - const char *cc_config_file_; - // Name of the channel connection configuration file. + // Parse gateway configuration arguments obtained from svc.conf + // file. - const char *rt_config_file_; - // Name of the routing table configuration file. - - int performance_window_; - // Number of seconds after connection establishment to report throughput. - - int blocking_semantics_; - // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects. - - int debug_; - // Are we debugging? - - Channel_Connector *connector_; - // This is used to establish the connections actively. - - int socket_queue_size_; - // Size of the socket queue (0 means "use default"). - - // = Manage output and input channel threads (if used.) - // if both input and output mt is used, they will share thr_mgr_, - // thr_mgr_ will always reference the thread manager being used - // regardless of whether input, output, or both channels are using mt. - ACE_Thread_Manager *thr_mgr_; - ACE_Thread_Manager *input_thr_mgr_; - ACE_Thread_Manager *output_thr_mgr_; + ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_; }; // Convenient shorthands. +// #define IC SUPPLIER_HANDLER +// #define OC CONSUMER_HANDLER -#define IC INPUT_CHANNEL -#define OC OUTPUT_CHANNEL - -template <class IC, class OC> int -Gateway<IC, OC>::handle_signal (int signum, siginfo_t *, ucontext_t *) +int +Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) { if (signum > 0) ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum)); - if (this->thr_mgr_ != 0) - { -#if defined (ACE_HAS_THREADS) - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n")); - if (this->thr_mgr_->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); -#endif /* ACE_HAS_THREADS */ - } + this->event_channel_.close (); // Shut down the main event loop. ACE_Service_Config::end_reactor_event_loop (); return 0; } -template <class IC, class OC> int -Gateway<IC, OC>::handle_input (ACE_HANDLE h) +int +Gateway::handle_input (ACE_HANDLE h) { - if (ACE_Service_Config::reactor ()->remove_handler (0, - ACE_Event_Handler::READ_MASK - | ACE_Event_Handler::DONT_CALL) == -1) + if (ACE_Service_Config::reactor ()->remove_handler + (0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "remove_handler"), -1); + char buf[BUFSIZ]; // Consume the input... ACE_OS::read (h, buf, sizeof (buf)); - return this->handle_signal (h); -} -template <class IC, class OC> int -Gateway<IC, OC>::handle_timeout (const ACE_Time_Value &, const void *) -{ - ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n")); - CONFIG_ITERATOR cti (this->config_table_); - - // If we've got a ACE_Thread Manager then use it to suspend all - // the threads. This will enable us to get an accurate count. - - if (this->thr_mgr_ != 0) - { -#if defined (ACE_HAS_THREADS) - if (this->thr_mgr_->suspend_all () == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); - ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads...")); -#endif /* ACE_HAS_THREADS */ - } - - size_t total_bytes_in = 0; - size_t total_bytes_out = 0; - - // Iterate through the routing table connecting all the channels. - - for (ACE_Map_Entry <CONN_ID, Channel *> *me = 0; - cti.next (me) != 0; - cti.advance ()) - { - Channel *channel = me->int_id_; - if (channel->direction () == 'O') - total_bytes_out += channel->total_bytes (); - else - total_bytes_in += channel->total_bytes (); - } - -#if defined (ACE_NLOGGING) - ACE_OS::fprintf (stderr, "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - performance_window_, - total_bytes_in, - total_bytes_out); - - ACE_OS::fprintf (stderr, "%f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_))); - - ACE_OS::fprintf (stderr, "%f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))); -#else - ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n", - this->performance_window_, - total_bytes_in, - total_bytes_out)); - ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n", - (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)))); - ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n", - (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)))); -#endif /* ACE_NLOGGING */ - // Resume all the threads again. - if (this->thr_mgr_ != 0) - { -#if defined (ACE_HAS_THREADS) - this->thr_mgr_->resume_all (); - ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads...")); -#endif /* ACE_HAS_THREADS */ - } - return 0; + // Shut us down. + return this->handle_signal (h); } // Give default values to data members. -template <class IC, class OC> -Gateway<IC, OC>::Gateway (ACE_Thread_Manager *thr_mgr) - : cc_config_file_ ("cc_config"), - rt_config_file_ ("rt_config"), - performance_window_ (0), - blocking_semantics_ (ACE_NONBLOCK), - debug_ (0), - connector_ (0), - socket_queue_size_ (0), - thr_mgr_ (thr_mgr), - input_thr_mgr_ (thr_mgr), - output_thr_mgr_ (thr_mgr) -{ -} -// Parse the "command-line" arguments and set the corresponding flags. - -template <class IC, class OC> int -Gateway<IC, OC>::parse_args (int argc, char *argv[]) -{ - ACE_Get_Opt get_opt (argc, argv, "bc:dr:q:w:", 0); - - for (int c; (c = get_opt ()) != -1; ) - { - switch (c) - { - case 'b': // Use blocking connection establishment. - this->blocking_semantics_ = 0; - break; - case 'c': - this->cc_config_file_ = get_opt.optarg; - break; - case 'd': - this->debug_ = 1; - break; - case 'q': - this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg); - break; - case 'r': - this->rt_config_file_ = get_opt.optarg; - break; - case 'w': // Time performance for a designated amount of time. - this->performance_window_ = ACE_OS::atoi (get_opt.optarg); - // Use blocking connection semantics so that we get accurate - // timings (since all connections start at once). - this->blocking_semantics_ = 0; - break; - default: - break; - } - } - return 0; -} - -// Initiate connections with the peers. - -template <class IC, class OC> int -Gateway<IC, OC>::initiate_connections (void) -{ - CONFIG_ITERATOR cti (this->config_table_); - - // Iterate through the routing table connecting all the channels. - - for (ACE_Map_Entry <CONN_ID, Channel *> *me = 0; - cti.next (me) != 0; - cti.advance ()) - { - Channel *channel = me->int_id_; - if (this->connector_->initiate_connection - (channel, this->blocking_semantics_ == ACE_NONBLOCK - ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch) == -1) - continue; - } - - return 0; -} - -// This method is automatically called when the gateway -// is shutdown. It gracefully shuts down all the Channels -// in the Channel connection Config_Table. - -template <class IC, class OC> int -Gateway<IC, OC>::fini (void) +Gateway::Gateway (void) { - // Question: do we need to do anything special about the Routing_Table? - - CONFIG_ITERATOR cti (this->config_table_); - - for (ACE_Map_Entry <CONN_ID, Channel *> *me; - cti.next (me) != 0; - cti.advance ()) - { - Channel *channel = me->int_id_; - ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", - channel->id ())); - if (channel->state () != Channel::IDLE) - // Mark channel as DISCONNECTING so we don't try to reconnect... - channel->state (Channel::DISCONNECTING); - - // Deallocate Channel resources. - channel->destroy (); // Will trigger a delete. - } - - // Free up the resources allocated dynamically by the ACE_Connector. - delete this->connector_; - delete this->thr_mgr_; - - return 0; } -template <class IC, class OC> int -Gateway<IC, OC>::init (int argc, char *argv[]) +int +Gateway::init (int argc, char *argv[]) { - this->parse_args (argc, argv); - - ACE_NEW_RETURN (this->connector_, Channel_Connector (), -1); - - if (this->connector_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "out of memory"), -1); + if (this->event_channel_.open (argc, argv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1); - // Ignore SIPPIPE so each Output_Channel can handle it. + // Ignore SIPPIPE so each Consumer_Handler can handle it. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); ACE_Sig_Set sig_set; @@ -327,46 +99,23 @@ Gateway<IC, OC>::init (int argc, char *argv[]) this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + return 0; +} - if (this->thr_mgr_ == 0) - // Create a thread manager if using some combination of multi-threaded channels. -#if defined (USE_OUTPUT_MT) && defined (USE_INPUT_MT) - this->thr_mgr_ = this->output_thr_mgr_ = - this->input_thr_mgr_ = ACE_Service_Config::thr_mgr (); -#elif defined (USE_OUTPUT_MT) - this->thr_mgr_ = this->output_thr_mgr_ = ACE_Service_Config::thr_mgr (); -#elif defined (USE_INPUT_MT) - this->thr_mgr_ = this->input_thr_mgr_ = ACE_Service_Config::thr_mgr (); -#endif - - // Parse the connection configuration file. - this->parse_cc_config_file (); - - // Parse the routing table config file and build the routing table. - this->parse_rt_config_file (); - - // Initiate connections with the peers. - this->initiate_connections (); - - // If this->performance_window_ > 0 start a timer. - - if (this->performance_window_ > 0) - { - if (ACE_Service_Config::reactor ()->schedule_timer (this, 0, - this->performance_window_) == -1) - ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer")); - else - ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n", - this->performance_window_)); - } +// This method is automatically called when the gateway is shutdown. +// It closes down the Event Channel. +int +Gateway::fini (void) +{ + this->event_channel_.close (); return 0; } // Returns information on the currently active service. -template <class IC, class OC> int -Gateway<IC, OC>::info (char **strp, size_t length) const +int +Gateway::info (char **strp, size_t length) const { char buf[BUFSIZ]; @@ -380,182 +129,7 @@ Gateway<IC, OC>::info (char **strp, size_t length) const return ACE_OS::strlen (buf); } -// Parse and build the connection table. - -template <class IC, class OC> int -Gateway<IC, OC>::parse_cc_config_file (void) -{ - // File that contains the routing table configuration information. - CC_Config_File_Parser cc_file; - CC_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (cc_file.open (this->cc_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->cc_config_file_), -1); - - // Read config file line at a time. - while (cc_file.read_entry (entry, line_number) != FP::EOFILE) - { - file_empty = 0; - - if (this->debug_) - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, " - "direction = %c, max retry timeout = %d, local port = %d\n", - entry.conn_id_, entry.host_, entry.remote_port_, entry.direction_, - entry.max_retry_delay_, entry.local_port_)); - - Channel *channel = 0; - - // The next few lines of code are dependent on whether we are making - // an Input_Channel or an Output_Channel. - - if (entry.direction_ == 'O') // Configure an output channel. - ACE_NEW_RETURN (channel, - OUTPUT_CHANNEL (&this->routing_table_, - this->connector_, - this->output_thr_mgr_, - this->socket_queue_size_), - -1); - else /* direction == 'I' */ // Configure an input channel. - ACE_NEW_RETURN (channel, - INPUT_CHANNEL (&this->routing_table_, - this->connector_, - this->input_thr_mgr_, - this->socket_queue_size_), - -1); - if (channel == 0) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) out of memory\n"), -1); - - // The following code is common to both Input_ and Output_Channels. - - // Initialize the routing entry's peer addressing info. - channel->bind (ACE_INET_Addr (entry.remote_port_, entry.host_), - ACE_INET_Addr (entry.local_port_), entry.conn_id_); - - // Initialize max timeout. - channel->max_timeout (entry.max_retry_delay_); - - // Try to bind the new Channel to the connection ID. - switch (this->config_table_.bind (entry.conn_id_, channel)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate connection %d, already bound\n", - entry.conn_id_)); - break; - case 0: - // Success. - break; - } - } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: connection channel configuration file was empty\n")); - - return 0; -} - -template <class IC, class OC> int -Gateway<IC, OC>::parse_rt_config_file (void) -{ - // File that contains the routing table configuration information. - RT_Config_File_Parser rt_file; - RT_Config_File_Entry entry; - int file_empty = 1; - int line_number = 0; - - if (rt_file.open (this->rt_config_file_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->rt_config_file_), -1); - - // Read config file line at a time. - while (rt_file.read_entry (entry, line_number) != FP::EOFILE) - { - file_empty = 0; - - if (this->debug_) - { - ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " - "number of destinations = %d\n", - entry.conn_id_, entry.logical_id_, entry.payload_type_, - entry.total_destinations_)); - for (int i = 0; i < entry.total_destinations_; i++) - ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - Routing_Entry *re; - ACE_NEW_RETURN (re, Routing_Entry, -1); - Routing_Entry::ENTRY_SET *channel_set = new Routing_Entry::ENTRY_SET; - Peer_Addr peer_addr (entry.conn_id_, entry.logical_id_, - entry.payload_type_); - - // Add the destinations to the Routing Entry. - for (int i = 0; i < entry.total_destinations_; i++) - { - Channel *channel = 0; - - // Lookup destination and add to Routing_Entry set if found. - if (this->config_table_.find (entry.destinations_[i], - channel) != -1) - channel_set->insert (channel); - else - ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", - i, entry.destinations_[i])); - } - - // Attach set of destination channels to routing entry. - re->destinations (channel_set); - - // Bind with routing table, keyed by peer address. - switch (this->routing_table_.bind (peer_addr, re)) - { - case -1: - ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", - entry.conn_id_), -1); - /* NOTREACHED */ - case 1: // Oops, found a duplicate! - ACE_DEBUG ((LM_DEBUG, "(%t) duplicate routing table entry %d, " - "already bound\n", entry.conn_id_)); - break; - case 0: - // Success. - break; - } - } - - if (file_empty) - ACE_ERROR ((LM_WARNING, - "warning: routing table configuration file was empty\n")); - - return 0; -} - -#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) -#if defined (USE_OUTPUT_MT) -typedef Thr_Output_Channel OUTPUT_CHANNEL; -#else -typedef Output_Channel OUTPUT_CHANNEL; -#endif /* USE_OUTPUT_MT */ - -#if defined (USE_INPUT_MT) -typedef Thr_Input_Channel INPUT_CHANNEL; -#else -typedef Input_Channel INPUT_CHANNEL; -#endif /* USE_INPUT_MT */ -#else -// Instantiate a non-multi-threaded Gateway. -typedef Input_Channel INPUT_CHANNEL; -typedef Output_Channel OUTPUT_CHANNEL; -#endif /* ACE_HAS_THREADS */ - -typedef Gateway<INPUT_CHANNEL, OUTPUT_CHANNEL> ACE_Gateway; - // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the Gateway. -ACE_SVC_FACTORY_DEFINE (ACE_Gateway) +ACE_SVC_FACTORY_DEFINE (Gateway) diff --git a/apps/Gateway/Gateway/Gateway.h b/apps/Gateway/Gateway/Gateway.h index b00d87628de..057ce981701 100644 --- a/apps/Gateway/Gateway/Gateway.h +++ b/apps/Gateway/Gateway/Gateway.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY @@ -20,11 +19,7 @@ #include "ace/OS.h" -ACE_SVC_FACTORY_DECLARE (ACE_Gateway) +ACE_SVC_FACTORY_DECLARE (Gateway) #endif /* ACE_GATEWAY */ - - - - diff --git a/apps/Gateway/Gateway/IO_Handler.cpp b/apps/Gateway/Gateway/IO_Handler.cpp new file mode 100644 index 00000000000..94997955979 --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler.cpp @@ -0,0 +1,710 @@ +// $Id$ + +#include "Consumer_Entry.h" +#include "IO_Handler_Connector.h" + +// Convenient short-hands. +#define CO CONDITION +#define MU MAP_MUTEX + +// The total number of bytes sent/received on this channel. + +size_t +IO_Handler::total_bytes (void) +{ + return this->total_bytes_; +} + +void +IO_Handler::total_bytes (size_t bytes) +{ + this->total_bytes_ += bytes; +} + +IO_Handler::IO_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), + consumer_map_ (consumer_map), + id_ (-1), + total_bytes_ (0), + state_ (IO_Handler::IDLE), + connector_ (ioc), + timeout_ (1), + max_timeout_ (IO_Handler::MAX_RETRY_TIMEOUT), + socket_queue_size_ (socket_queue_size) +{ +} + +// Set the associated channel. + +void +IO_Handler::active (int a) +{ + this->state (a == 0 ? IO_Handler::IDLE : IO_Handler::ESTABLISHED); +} + +// Get the associated channel. + +int +IO_Handler::active (void) +{ + return this->state () == IO_Handler::ESTABLISHED; +} + +// Set the direction. + +void +IO_Handler::direction (char d) +{ + this->direction_ = d; +} + +// Get the direction. + +char +IO_Handler::direction (void) +{ + return this->direction_; +} + +// Sets the timeout delay. + +void +IO_Handler::timeout (int to) +{ + if (to > this->max_timeout_) + to = this->max_timeout_; + + this->timeout_ = to; +} + +// Recalculate the current retry timeout delay using exponential +// backoff. Returns the original timeout (i.e., before the +// recalculation). + +int +IO_Handler::timeout (void) +{ + int old_timeout = this->timeout_; + this->timeout_ *= 2; + + if (this->timeout_ > this->max_timeout_) + this->timeout_ = this->max_timeout_; + + return old_timeout; +} + +// Sets the max timeout delay. + +void +IO_Handler::max_timeout (int mto) +{ + this->max_timeout_ = mto; +} + +// Gets the max timeout delay. + +int +IO_Handler::max_timeout (void) +{ + return this->max_timeout_; +} + +// Restart connection asynchronously when timeout occurs. + +int +IO_Handler::handle_timeout (const ACE_Time_Value &, const void *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) attempting to reconnect IO_Handler %d with timeout = %d\n", + this->id (), this->timeout_)); + return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); +} + +// Restart connection (blocking_semantics dicates whether we +// restart synchronously or asynchronously). + +int +IO_Handler::reinitiate_connection (void) +{ + // Skip over deactivated descriptors. + if (this->get_handle () != -1) + { + // Make sure to close down peer to reclaim descriptor. + this->peer ().close (); + +#if 0 +// if (this->state () == FAILED) +// { + // Reinitiate timeout to improve reconnection time. +// this->timeout (1); +#endif + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of IO_Handler %d\n", + this->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + +// Handle shutdown of the IO_Handler object. + +int +IO_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down IO_Handler %d on handle %d\n", + this->id (), this->get_handle ())); + + return this->reinitiate_connection (); +} + +// Set the state of the channel. + +void +IO_Handler::state (IO_Handler::State s) +{ + this->state_ = s; +} + +// Perform the first-time initiation of a connection to the peer. + +int +IO_Handler::initialize_connection (void) +{ + this->state_ = IO_Handler::ESTABLISHED; + + // Restart the timeout to 1. + this->timeout (1); + +#if defined (ASSIGN_SUPPLIER_ID) + // Action that sends the route id to the peerd. + + CONN_ID id = htons (this->id ()); + + ssize_t n = this->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "gatewayd has closed down unexpectedly" : "send"), + -1); +#endif /* ASSIGN_SUPPLIER_ID */ + return 0; +} + +// Set the size of the socket queue. + +void +IO_Handler::socket_queue_size (void) +{ + if (this->socket_queue_size_ > 0) + { + int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; + + if (this->peer ().set_option (SOL_SOCKET, option, + &this->socket_queue_size_, sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + } +} + +// Upcall from the ACE_Acceptor::handle_input() that +// delegates control to our application-specific IO_Handler. + +int +IO_Handler::open (void *a) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) IO_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn on non-blocking I/O. + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Call down to the base class to activate and register this handler. + if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + + return this->initialize_connection (); +} + +// Return the current state of the channel. + +IO_Handler::State +IO_Handler::state (void) +{ + return this->state_; +} + +void +IO_Handler::id (CONN_ID id) +{ + this->id_ = id; +} + +CONN_ID +IO_Handler::id (void) +{ + return this->id_; +} + +// Set the peer's address information. +int +IO_Handler::bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + CONN_ID id) +{ + this->remote_addr_ = remote_addr; + this->local_addr_ = local_addr; + this->id_ = id; + return 0; +} + +ACE_INET_Addr & +IO_Handler::remote_addr (void) +{ + return this->remote_addr_; +} + +ACE_INET_Addr & +IO_Handler::local_addr (void) +{ + return this->local_addr_; +} + +// Constructor sets the consumer map pointer. + +Consumer_Handler::Consumer_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'C'; + this->msg_queue ()->high_water_mark (Consumer_Handler::QUEUE_SIZE); +} + +// This method should be called only when the peer shuts down +// unexpectedly. This method simply marks the IO_Handler as +// having failed so that handle_close () can reconnect. + +int +Consumer_Handler::handle_input (ACE_HANDLE) +{ + char buf[1]; + + this->state (IO_Handler::FAILED); + + switch (this->peer ().recv (buf, sizeof buf)) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has failed unexpectedly for Output IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + case 0: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has shutdown unexpectedly for Output IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer is sending input on Output IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + } +} + +// Perform a non-blocking put() of event MB. If we are unable to +// send the entire event the remainder is re-queued at the *front* of +// the Event_List. + +int +Consumer_Handler::nonblk_put (ACE_Message_Block *mb) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // Event_List and ask the ACE_Reactor to inform us (via + // handle_output()) when it is possible to try again. + + ssize_t n; + + if ((n = this->send (mb)) == -1) + { + // Things have gone wrong, let's try to close down and set up a new reconnection. + this->state (IO_Handler::FAILED); + this->handle_close (); + return -1; + } + else if (errno == EWOULDBLOCK) // Didn't manage to send everything. + { + ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + // ACE_Queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); + + // Tell ACE_Reactor to call us back when we can send again. + else if (ACE_Service_Config::reactor ()-> + schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); + return 0; + } + else + return n; +} + +int +Consumer_Handler::send (ACE_Message_Block *mb) +{ + ssize_t n; + size_t len = mb->length (); + + if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < len) + // Re-adjust pointer to skip over the part we did send. + mb->rd_ptr (n); + else /* if (n == length) */ + { + // The whole event is sent, we can now safely deallocate the + // buffer. Note that this should decrement a reference count... + delete mb; + errno = 0; + } + this->total_bytes (n); + return n; +} + +// Finish sending an event when flow control conditions abate. +// This method is automatically called by the ACE_Reactor. + +int +Consumer_Handler::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *mb = 0; + int status = 0; + + ACE_DEBUG ((LM_DEBUG, + "(%t) in handle_output on handle %d\n", + this->get_handle ())); + // The list had better not be empty, otherwise there's a bug! + + if (this->msg_queue ()->dequeue_head + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (mb)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + // Didn't write everything this time, come back later... + break; + + case -1: + // Caller is responsible for freeing a ACE_Message_Block if failures occur. + delete mb; + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); + + /* FALLTHROUGH */ + default: // Sent the whole thing. + + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the Event_List. If there aren't, tell the + // ACE_Reactor not to notify us anymore (at least until + // there are new events queued up). + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing deactivated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + + if (ACE_Service_Config::reactor ()-> + cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); + } + } + } + else + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); + return 0; +} + +// Send an event to a peer (may queue if necessary). + +int +Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (mb); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Constructor sets the consumer map pointer and the connector +// pointer. + +Supplier_Handler::Supplier_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : msg_frag_ (0), + IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'S'; + this->msg_queue ()->high_water_mark (0); +} + +// Receive a Peer event from peerd. Handles fragmentation. +// +// The routing event returned from recv consists of two parts: +// 1. The Address part, contains the virtual routing id. +// 2. The Data part, which contains the actual data to be routed. +// +// The reason for having two parts is to shield the higher layers +// of software from knowledge of the event structure. + +int +Supplier_Handler::recv (ACE_Message_Block *&forward_addr) +{ + Event *event; + size_t len; + ssize_t n = 0; + ssize_t m = 0; + size_t offset = 0; + + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event)), + -1); + + event = (Event *) this->msg_frag_->rd_ptr (); + + const ssize_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + n = this->peer ().recv (this->msg_frag_->wr_ptr (), + header_bytes_left_to_read); + + if (n == -1 /* error */ + || n == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, "%p\n", + "Recv error during header read ")); + ACE_DEBUG ((LM_DEBUG, + "attempted to read %d\n", + header_bytes_left_to_read)); + delete this->msg_frag_; + this->msg_frag_ = 0; + return n; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (n); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + "Partial header received: only %d bytes\n", + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + } + + // At this point there is a complete, valid header in msg_frag_ + len = sizeof event->buf_ + HEADER_SIZE - this->msg_frag_->length (); + + // Try to receive the remainder of the event + + switch (m = this->peer ().recv (event->buf_ + offset, len)) + { + case -1: + if (errno == EWOULDBLOCK) + { + // This shouldn't happen since the ACE_Reactor + // just triggered us to handle pending I/O! + ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n")); + errno = EWOULDBLOCK; + return -1; + } + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + delete this->msg_frag_; + this->msg_frag_ = 0; + return 0; + + default: + if (m != len) + // Re-adjust pointer to skip over the part we've read. + { + this->msg_frag_->wr_ptr (m); + errno = EWOULDBLOCK; + return -1; // Inform caller that we didn't get the whole event. + } + else + { + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (m); + + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + // Allocate an event forwarding header and chain the data + // portion onto its continuation field. + ACE_NEW_RETURN (forward_addr, + ACE_Message_Block (sizeof (Event_Addr), + ACE_Message_Block::MB_PROTO, + this->msg_frag_), + -1); + + Event_Addr event_addr (this->id (), event->header_.routing_id_, 0); + // Copy the forwarding address from the Event_Addr into + // forward_addr. + forward_addr->copy ((char *) &event_addr, sizeof (Event)); + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + this->total_bytes (m + n); +#if defined (VERBOSE) + ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s", + event_addr.conn_id_, event->header_.routing_id_, event->header_.len_, + event->header_.len_, event->buf_)); +#else + ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n", + event->header_.routing_id_, event->header_.len_, this->total_bytes ())); +#endif + return m + n; + } +} + +// Receive various types of input (e.g., Peer event from the +// gatewayd, as well as stdio). + +int +Supplier_Handler::handle_input (ACE_HANDLE) +{ + ACE_Message_Block *forward_addr = 0; + + switch (this->recv (forward_addr)) + { + case 0: + // Note that a peer should never initiate a shutdown. + this->state (IO_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has closed down unexpectedly for Input IO_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else // A weird problem occurred, shut down and start again. + { + this->state (IO_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input IO_Handler %d\n", + "Peer has failed unexpectedly", + this->id ()), -1); + } + /* NOTREACHED */ + default: + return this->forward (forward_addr); + } +} + +// Route an event to its appropriate destination. + +int +Supplier_Handler::forward (ACE_Message_Block *forward_addr) +{ + // We got a valid event, so determine its virtual routing id, + // which is stored in the first of the two event blocks chained + // together. + + Event_Addr *forwarding_key = (Event_Addr *) forward_addr->rd_ptr (); + + // Skip over the address portion. + const ACE_Message_Block *const data = forward_addr->cont (); + + // RE points to the routing entry located for this routing id. + Consumer_Entry *re = 0; + + if (this->consumer_map_->find (*forwarding_key, re) != -1) + { + // Check to see if there are any destinations. + if (re->destinations ()->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active destinations for this event currently\n")); + + else // There are destinations, so forward the event. + { + Consumer_Entry::ENTRY_SET *esp = re->destinations (); + Consumer_Entry::ENTRY_ITERATOR si (*esp); + + for (IO_Handler **channel = 0; si.next (channel) != 0; si.advance ()) + { + // Only process active channels. + if ((*channel)->active ()) + { + // Clone the event portion (should be doing reference counting here...) + ACE_Message_Block *newmsg = data->clone (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ())); + + if ((*channel)->put (newmsg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n", + "put", (*channel)->id ())); + + // Caller is responsible for freeing a ACE_Message_Block if failures occur. + delete newmsg; + } + } + } + // Will become superfluous once we have reference counting... + delete forward_addr; + return 0; + } + } + delete forward_addr; + // Failure return. + ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", + forwarding_key->conn_id_, forwarding_key->logical_id_, forwarding_key->payload_)); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Addr, Consumer_Entry *>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/IO_Handler.h b/apps/Gateway/Gateway/IO_Handler.h new file mode 100644 index 00000000000..c22f1a3df26 --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler.h @@ -0,0 +1,224 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// IO_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_IO_HANDLER) +#define _IO_HANDLER + +#include "ace/Service_Config.h" +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "Consumer_Map.h" +#include "Consumer_Entry.h" +#include "Event.h" + +// Forward declaration. +class IO_Handler_Connector; + +class IO_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> + // = TITLE + // IO_Handler contains info about connection state and addressing. + // + // = DESCRIPTION + // The IO_Handler classes process events sent from the peers to the + // gateway. These classes works as follows: + // + // 1. IO_Handler_Connector creates a number of connections with the set of + // peers specified in a configuration file. + // + // 2. For each peer that connects successfully, IO_Handler_Connector + // creates an IO_Handler object. Each object assigns a unique routing + // id to its associated peer. The Handlers are used by gatewayd + // that to receive, route, and forward events from source peer(s) + // to destination peer(s). +{ +public: + IO_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int open (void * = 0); + // Initialize and activate a single-threaded IO_Handler (called by + // ACE_Connector::handle_output()). + + int bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + CONN_ID); + // Set the peer's addressing and routing information. + + ACE_INET_Addr &remote_addr (void); + // Returns the peer's routing address. + + ACE_INET_Addr &local_addr (void); + // Returns our local address. + + // = Set/get routing id. + CONN_ID id (void); + void id (CONN_ID); + + // = Set/get the current state of the IO_Handler. + enum State + { + IDLE = 1, // Prior to initialization. + CONNECTING, // During connection establishment. + ESTABLISHED, // IO_Handler is established and active. + DISCONNECTING, // IO_Handler is in the process of connecting. + FAILED // IO_Handler has failed. + }; + + // = Set/get the current state. + State state (void); + void state (State); + + // = Set/get the current retry timeout delay. + int timeout (void); + void timeout (int); + + // = Set/get the maximum retry timeout delay. + int max_timeout (void); + void max_timeout (int); + + // = Set/get IO_Handler activity status. + int active (void); + void active (int); + + // = Set/get direction (necessary for error checking). + char direction (void); + void direction (char); + + // = The total number of bytes sent/received on this channel. + size_t total_bytes (void); + void total_bytes (size_t bytes); + // Increment count by <bytes>. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based IO_Handler reconnection. + +protected: + enum + { + MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. + }; + + int initialize_connection (void); + // Perform the first-time initiation of a connection to the peer. + + int reinitiate_connection (void); + // Reinitiate a connection asynchronously when peers fail. + + void socket_queue_size (void); + // Set the socket queue size. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + // Perform IO_Handler termination. + + Consumer_Map *consumer_map_; + // Pointer to table that maps an event + // to a Set of IO_Handler *'s for output. + + ACE_INET_Addr remote_addr_; + // Address of peer. + + ACE_INET_Addr local_addr_; + // Address of us. + + CONN_ID id_; + // The assigned routing ID of this entry. + + size_t total_bytes_; + // The total number of bytes sent/received on this channel. + + State state_; + // The current state of the channel. + + IO_Handler_Connector *connector_; + // Back pointer to IO_Handler_Connector to reestablish broken + // connections. + + int timeout_; + // Amount of time to wait between reconnection attempts. + + int max_timeout_; + // Maximum amount of time to wait between reconnection attempts. + + char direction_; + // Indicates which direction data flows through the channel ('O' == + // output and 'I' == input). + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). +}; + +class Supplier_Handler : public IO_Handler + // = TITLE + // Handle reception of Peer events arriving as events. +{ +public: + Supplier_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + // Constructor sets the consumer map pointer. + + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + // Receive and process peer events. + +protected: + virtual int recv (ACE_Message_Block *&); + // Receive an event from a Supplier. + + int forward (ACE_Message_Block *event); + // Forward the Event to a Consumer. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragment to handle non-blocking recv's from + // Suppliers. +}; + +class Consumer_Handler : public IO_Handler + // = TITLE + // Handle transmission of events to other Peers using a + // single-threaded approach. +{ +public: + Consumer_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send an event to a Consumer (may be queued if necessary). + +protected: + // = We'll allow up to 16 megabytes to be queued per-output + // channel. + enum {QUEUE_SIZE = 1024 * 1024 * 16}; + + virtual int handle_input (ACE_HANDLE); + // Receive and process shutdowns from a Consumer. + + virtual int handle_output (ACE_HANDLE); + // Finish sending event when flow control conditions abate. + + int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking put(). + + virtual int send (ACE_Message_Block *); + // Send an event to a Consumer. +}; + +#endif /* _IO_HANDLER */ diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.cpp b/apps/Gateway/Gateway/IO_Handler_Connector.cpp new file mode 100644 index 00000000000..712b348951d --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler_Connector.cpp @@ -0,0 +1,92 @@ +#include "IO_Handler_Connector.h" +// $Id$ + + +IO_Handler_Connector::IO_Handler_Connector (void) +{ +} + +// Override the connection-failure method to add timer support. +// Note that these timers perform "expoential backoff" to +// avoid rapidly trying to reestablish connections when a link +// goes down. + +int +IO_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) +{ + ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0; + + // Locate the ACE_Svc_Handler corresponding to the socket descriptor. + if (this->handler_map_.find (sd, stp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", + sd, "find"), -1); + + IO_Handler *channel = stp->svc_handler (); + + // Schedule a reconnection request at some point in the future + // (note that channel uses an exponential backoff scheme). + if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, + channel->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + return 0; +} + +// Initiate (or reinitiate) a connection to the IO_Handler. + +int +IO_Handler_Connector::initiate_connection (IO_Handler *channel, + ACE_Synch_Options &synch_options) +{ + char buf[MAXHOSTNAMELEN]; + + // Mark ourselves as idle so that the various iterators + // will ignore us until we are reconnected. + channel->state (IO_Handler::IDLE); + + if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 + || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "can't obtain peer's address"), -1); + + // Try to connect to the Peer. + + if (this->connect (channel, channel->remote_addr (), + synch_options, channel->local_addr ()) == -1) + { + if (errno != EWOULDBLOCK) + { + channel->state (IO_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", + "connect", buf)); + + // Reschedule ourselves to try and connect again. + if (synch_options[ACE_Synch_Options::USE_REACTOR]) + { + if (ACE_Service_Config::reactor ()->schedule_timer + (channel, 0, channel->timeout ()) == 0) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + else + // Failures on synchronous connects are reported as errors + // so that the caller can decide how to proceed. + return -1; + } + else + { + channel->state (IO_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + "(%t) in the process of connecting %s to %s\n", + synch_options[ACE_Synch_Options::USE_REACTOR] + ? "asynchronously" : "synchronously", buf)); + } + } + else + { + channel->state (IO_Handler::ESTABLISHED); + ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", + buf, channel->get_handle ())); + } + return 0; +} diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.h b/apps/Gateway/Gateway/IO_Handler_Connector.h new file mode 100644 index 00000000000..585428c88ee --- /dev/null +++ b/apps/Gateway/Gateway/IO_Handler_Connector.h @@ -0,0 +1,40 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// IO_Handler_Connector.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_IO_HANDLER_CONNECTOR) +#define _IO_HANDLER_CONNECTOR + +#include "ace/Connector.h" +#include "Thr_IO_Handler.h" + +class IO_Handler_Connector : public ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR> + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new IO_Handler object to do the dirty work... +{ +public: + IO_Handler_Connector (void); + + // Initiate (or reinitiate) a connection on the IO_Handler. + int initiate_connection (IO_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + +protected: + // Override the connection-failure method to add timer support. + virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask); +}; + +#endif /* _IO_HANDLER_CONNECTOR */ diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index b2115abbd59..3b54556c4f6 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -9,17 +9,18 @@ #---------------------------------------------------------------------------- BIN = gatewayd -LIB = libGateway.a +#LIB = libGateway.a SHLIB = libGateway.so -FILES = Channel \ - Channel_Connector \ +FILES = Event_Channel \ + IO_Handler \ + IO_Handler_Connector \ Config_Files \ File_Parser \ Gateway \ - Routing_Entry \ - Routing_Table \ - Thr_Channel + Consumer_Entry \ + Consumer_Map \ + Thr_IO_Handler LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) @@ -51,7 +52,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU # Default behavior is to use single-threading. See the README # file for information on how to configure this with multiple # strategies for threading the input and output channels. -DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT +DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT #---------------------------------------------------------------------------- # Dependencies @@ -60,8 +61,8 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Channel.o .shobj/Channel.so: Channel.cpp Routing_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h \ +.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -73,7 +74,7 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - Channel_Connector.h \ + Config_Files.h File_Parser.h IO_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ @@ -87,19 +88,80 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ + $(WRAPPER_ROOT)/ace/SOCK_IO.h \ + $(WRAPPER_ROOT)/ace/SOCK.h \ + $(WRAPPER_ROOT)/ace/Addr.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.h \ + $(WRAPPER_ROOT)/ace/IPC_SAP.i \ + $(WRAPPER_ROOT)/ace/SOCK.i \ + $(WRAPPER_ROOT)/ace/SOCK_IO.i \ + $(WRAPPER_ROOT)/ace/INET_Addr.h \ + $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Proactor.h \ $(WRAPPER_ROOT)/ace/Message_Block.h \ $(WRAPPER_ROOT)/ace/Malloc.h \ $(WRAPPER_ROOT)/ace/Malloc_T.h \ $(WRAPPER_ROOT)/ace/Memory_Pool.h \ - $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Task_T.h \ + $(WRAPPER_ROOT)/ace/Connector.i \ + Thr_IO_Handler.h IO_Handler.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ + Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h \ + Event_Channel.h +.obj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp Consumer_Entry.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/ACE.i \ + IO_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + $(WRAPPER_ROOT)/ace/Thread_Manager.h \ + $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -113,21 +175,30 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_Channel.h Channel.h \ + Thr_IO_Handler.h IO_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Routing_Table.h Peer_Message.h -.obj/Channel_Connector.o .shobj/Channel_Connector.so: Channel_Connector.cpp Channel_Connector.h \ + Consumer_Map.h Concurrency_Strategies.h Event.h +.obj/IO_Handler_Connector.o .shobj/IO_Handler_Connector.so: IO_Handler_Connector.cpp \ + IO_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ @@ -152,20 +223,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -179,20 +242,28 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_Channel.h Channel.h \ + Thr_IO_Handler.h IO_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Routing_Table.h Routing_Entry.h Peer_Message.h + Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h .obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -220,7 +291,9 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Record.i \ File_Parser.h .obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \ - $(WRAPPER_ROOT)/ace/Get_Opt.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -232,9 +305,6 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Service_Config.h \ - $(WRAPPER_ROOT)/ace/Service_Object.h \ - $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ @@ -244,20 +314,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -271,23 +333,20 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ - $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ - Config_Files.h File_Parser.h Gateway.h Channel_Connector.h \ - $(WRAPPER_ROOT)/ace/Connector.h \ - $(WRAPPER_ROOT)/ace/Map_Manager.h \ - $(WRAPPER_ROOT)/ace/Svc_Handler.h \ - $(WRAPPER_ROOT)/ace/Synch_Options.h \ - $(WRAPPER_ROOT)/ace/Task.h \ - $(WRAPPER_ROOT)/ace/Task_T.h \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ $(WRAPPER_ROOT)/ace/Message_Queue.h \ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ $(WRAPPER_ROOT)/ace/Strategies.h \ - $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_Channel.h Channel.h \ - $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ - $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Routing_Table.h Routing_Entry.h Peer_Message.h -.obj/Routing_Entry.o .shobj/Routing_Entry.so: Routing_Entry.cpp Routing_Entry.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ + $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + Gateway.h +.obj/Consumer_Entry.o .shobj/Consumer_Entry.so: Consumer_Entry.cpp Consumer_Entry.h \ $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -300,7 +359,7 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i -.obj/Routing_Table.o .shobj/Routing_Table.so: Routing_Table.cpp Routing_Table.h \ +.obj/Consumer_Map.o .shobj/Consumer_Map.so: Consumer_Map.cpp Consumer_Map.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -312,8 +371,10 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Record.h \ $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ - $(WRAPPER_ROOT)/ace/ACE.i -.obj/Thr_Channel.o .shobj/Thr_Channel.so: Thr_Channel.cpp Thr_Channel.h Channel.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ + Concurrency_Strategies.h Event.h Consumer_Entry.h \ + $(WRAPPER_ROOT)/ace/Set.h +.obj/Thr_IO_Handler.o .shobj/Thr_IO_Handler.so: Thr_IO_Handler.cpp Thr_IO_Handler.h IO_Handler.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -337,20 +398,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -364,6 +417,17 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ @@ -371,13 +435,11 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - Routing_Table.h \ + Consumer_Map.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Routing_Entry.h Peer_Message.h Channel_Connector.h \ + Concurrency_Strategies.h Event.h Consumer_Entry.h \ + IO_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i # IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README index ceb17528d0d..4e986354aaa 100644 --- a/apps/Gateway/Gateway/README +++ b/apps/Gateway/Gateway/README @@ -1,22 +1,23 @@ -This application illustrates an application-level Gateway which -routes messages between a set of Peers in a distributed environment. +This application illustrates an application-level Gateway which routes +messages between Consumer and Suppliers in a distributed environment. -The default configuration is single-threaded, i.e., all Input_Channels -and Output_Channels are multiplexed via the Reactor on a single thread -of control. To obtain a version that multi-threads both input and -output simply set the following flag in the Makefile: +The default configuration is single-threaded, i.e., all +Supplier_Handlers and Consumer_Handlers are multiplexed via the ACE +Reactor within a single thread of control. To obtain a version that +multi-threads both Consumer_Handlers and Supplier_Handlers simply set +the following flag in the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT -To get a version that uses single-threading for all Input_Channels, -but a separate thread per-Output_Channel set the following flag in the -Makefile: +To get a version that uses single-threading for all Supplier_Handlers, +but a separate thread per-Consumer_Handler set the following flag in +the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT If you examine the source code, you'll see that very few changes are required in the source code to switch between single-threading and multi-threading. The ACE Task class is primarily responsible for -enabling the flexible modification of concurrency strategies with -little modification to the source code, design, and system +enabling the flexible modification of concurrency strategies with only +minor changes required to the source code, design, and system architecture. diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.cpp b/apps/Gateway/Gateway/Thr_IO_Handler.cpp new file mode 100644 index 00000000000..109cfad9c3f --- /dev/null +++ b/apps/Gateway/Gateway/Thr_IO_Handler.cpp @@ -0,0 +1,204 @@ +#include "Thr_IO_Handler.h" +// $Id$ + +#include "IO_Handler_Connector.h" + +#if defined (ACE_HAS_THREADS) +Thr_Consumer_Handler::Thr_Consumer_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Consumer_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ +} + +// This method should be called only when the peer shuts down +// unexpectedly. This method marks the IO_Handler as having failed and +// deactivates the ACE_Message_Queue (to wake up the thread blocked on +// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will +// eventually try to reconnect... + +int +Thr_Consumer_Handler::handle_input (ACE_HANDLE h) +{ + this->Consumer_Handler::handle_input (h); + ACE_Service_Config::reactor ()->remove_handler (h, + ACE_Event_Handler::RWE_MASK + | ACE_Event_Handler::DONT_CALL); + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + return 0; +} + +// Initialize the threaded Consumer_Handler object and spawn a new +// thread. + +int +Thr_Consumer_Handler::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Register ourselves to receive input events (which indicate that + // the Peer has shut down unexpectedly). + if (ACE_Service_Config::reactor ()->register_handler (this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// ACE_Queue up a message for transmission (must not block since all +// Supplier_Handlers are single-threaded). + +int +Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + // Perform non-blocking enqueue. + return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Transmit messages to the peer (note simplification resulting from +// threads...) + +int +Thr_Consumer_Handler::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread it is OK to block on + // output. + + for (ACE_Message_Block *mb = 0; + this->msg_queue ()->dequeue_head (mb) != -1; ) + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n", + this->id (), this->get_handle ())); + + this->peer ().close (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + ACE_OS::sleep (tv); + } + } + + return 0; +} + +Thr_Supplier_Handler::Thr_Supplier_Handler (Consumer_Map *consumer_map, + IO_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Supplier_Handler (consumer_map, ioc, thr_mgr, socket_queue_size) +{ +} + +int +Thr_Supplier_Handler::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// Receive messages from a Peer in a separate thread (note reuse of +// existing code!). + +int +Thr_Supplier_Handler::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread and processes + // messages for one connection it is OK to block on input and + // output. + + while (this->handle_input () != -1) + continue; + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Supplier_Handler %d on handle %d\n", + this->id (), + this->get_handle ())); + + this->peer ().close (); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", tv.sec ())); + ACE_OS::sleep (tv); + } + } + return 0; +} + +#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.h b/apps/Gateway/Gateway/Thr_IO_Handler.h new file mode 100644 index 00000000000..ee056b35361 --- /dev/null +++ b/apps/Gateway/Gateway/Thr_IO_Handler.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Thr_IO_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_THR_IO_HANDLER) +#define _THR_IO_HANDLER + +#include "IO_Handler.h" + +#if defined (ACE_HAS_THREADS) +class Thr_Consumer_Handler : public Consumer_Handler + // = TITLE + // Runs each Output IO_Handler in a separate thread. +{ +public: + Thr_Consumer_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the threaded Consumer_Handler object and spawn a new + // thread. + + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send a message to a peer. + + virtual int svc (void); + // Transmit peer messages. +}; + +class Thr_Supplier_Handler : public Supplier_Handler + // = TITLE + // Runs each Input IO_Handler in a separate thread. +{ +public: + Thr_Supplier_Handler (Consumer_Map *, + IO_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the object and spawn a new thread. + + virtual int svc (void); + // Transmit peer messages. +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _THR_IO_HANDLER */ diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config new file mode 100644 index 00000000000..d33469ee157 --- /dev/null +++ b/apps/Gateway/Gateway/consumer_config @@ -0,0 +1,8 @@ +# Consumer map configuration file +# Conn ID Logical ID Payload Destinations +# ------- ---------- ------- ------------ +# 1 1 0 3,4,5 + 1 1 0 3 + 3 1 0 3 +# 4 1 0 4 +# 5 1 0 5 diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf index 7ae8d4b0080..6c41415a9ce 100644 --- a/apps/Gateway/Gateway/svc.conf +++ b/apps/Gateway/Gateway/svc.conf @@ -1,3 +1,3 @@ #static Svc_Manager "-d -p 2913" -dynamic Gateway Service_Object * ./libGateway:_make_ACE_Gateway() active "-d -c cc_config -f rt_config" +dynamic Gateway Service_Object * ./libGateway:_make_ACE_Gateway() active "-d -c connection_config -f consumer_config" diff --git a/apps/Gateway/Peer/Gateway_Handler.cpp b/apps/Gateway/Peer/Gateway_Handler.cpp index 15ca0a58807..cfc9a7dad6f 100644 --- a/apps/Gateway/Peer/Gateway_Handler.cpp +++ b/apps/Gateway/Peer/Gateway_Handler.cpp @@ -84,10 +84,10 @@ Gateway_Handler::xmit_stdin (void) ACE_Message_Block *mb; ACE_NEW_RETURN (mb, - ACE_Message_Block (sizeof (Peer_Message)), + ACE_Message_Block (sizeof (Event)), -1); - Peer_Message *peer_msg = (Peer_Message *) mb->rd_ptr (); + Event *peer_msg = (Event *) mb->rd_ptr (); peer_msg->header_.routing_id_ = this->routing_id_; n = ACE_OS::read (ACE_STDIN, peer_msg->buf_, sizeof peer_msg->buf_); @@ -270,7 +270,7 @@ Gateway_Handler::send_peer (ACE_Message_Block *mb) int Gateway_Handler::recv_peer (ACE_Message_Block *&mb) { - Peer_Message *peer_msg; + Event *peer_msg; size_t len; ssize_t n; size_t offset = 0; @@ -278,14 +278,14 @@ Gateway_Handler::recv_peer (ACE_Message_Block *&mb) if (this->msg_frag_ == 0) { ACE_NEW_RETURN (this->msg_frag_, - ACE_Message_Block (sizeof (Peer_Message)), + ACE_Message_Block (sizeof (Event)), -1); // No existing fragment... if (this->msg_frag_ == 0) ACE_ERROR_RETURN ((LM_ERROR, "out of memory\n"), -1); - peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr (); + peer_msg = (Event *) this->msg_frag_->rd_ptr (); switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header))) { @@ -441,7 +441,7 @@ Gateway_Handler::await_messages (void) // We got a valid message, so let's process it now! At the // moment, we just print out the message contents... - Peer_Message *peer_msg = (Peer_Message *) mb->rd_ptr (); + Event *peer_msg = (Event *) mb->rd_ptr (); this->total_bytes_ += mb->length (); #if defined (VERBOSE) diff --git a/apps/Gateway/Peer/Gateway_Handler.h b/apps/Gateway/Peer/Gateway_Handler.h index 6dc4539e6b7..82477264c4f 100644 --- a/apps/Gateway/Peer/Gateway_Handler.h +++ b/apps/Gateway/Peer/Gateway_Handler.h @@ -32,7 +32,7 @@ #include "ace/SOCK_Acceptor.h" #include "ace/INET_Addr.h" #include "ace/Map_Manager.h" -#include "Peer_Message.h" +#include "Event.h" // Forward declaration. class Gateway_Handler; diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile index f88aaf14926..9909eb2ef2a 100644 --- a/apps/Gateway/Peer/Makefile +++ b/apps/Gateway/Peer/Makefile @@ -14,9 +14,9 @@ FILES = Gateway_Handler LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) -VSHOBJS = $(LSRC:%.cpp=$(VSHDIR)%.so) +SHOBJ = $(addsuffix .so,$(FILES)) -LDLIBS = $(VSHOBJS) +LDLIBS = $(addprefix .shobj/,$(SHOBJ)) VLDLIBS = $(LDLIBS:%=%$(VAR)) @@ -111,6 +111,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Peer_Message.h + Event.h # IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/apps/Gateway/README b/apps/Gateway/README index 892fe43ba6d..ffd7e52bdf4 100644 --- a/apps/Gateway/README +++ b/apps/Gateway/README @@ -2,8 +2,12 @@ OVERVIEW This directory contains source code for a prototype application-level gateway implemented with ACE. This prototype was developed in my -cs422 grad OS class at Washington University. You can get a paper -that explains the patterns used in this implementation at URL +cs422 OS class at Washington University. It illustrates the use of +Event Channels to forward events from Suppliers to Consumers in a +distributed system. + +You can get a paper that explains the patterns used in this +implementation at the following WWW URL: http://www.cs.wustl.edu/~schmidt/TAPOS-95.ps.gz @@ -15,28 +19,33 @@ Gateway -- The application Gateway, which must be started *after* all the Peers described below). This process reads the - cc_config and rt_config files. The cc_config file tells - the Gateway what connections to establish with which hosts - on which ports, etc. The rt_config file tells the Gateway - how to route data coming from "sources" to the appropriate - "destinations." - + connection_config and consumer_config files: + + 1. The connection_config file is used to establish the "physical + configuration." It tells the Gateway what connections + to establish with particular hosts using particular + ports. + + 2. The consumer_config file is used to establish the "logical + configuration." It tells the Gateway how to forward + data coming from "sources" to the appropriate + "destinations." Peer -- The test driver programs that must be started *before* the - Gateway. To do anything interesting you'll need at - least two Peers: one for supplying events and one for consuming - them. In the configuration files, these two types of Peers - are designated as follows: + Gateway. To do anything interesting you'll need at least + two Peers: one to supply events and one to consume events. + In the configuration files, these two types of Peers are + designated as follows: - (1) Input Peers (designated by an "I" in the Gateway's - cc_config configuration file). These Peers are "sources" - of messages to the Gateway. + 1. Supplier Peers (designated by an 'S' in the Gateway's + connection_config configuration file). These Peers are + "suppliers" of events to the Gateway. - (2) Output Peers (designated by an "O" in the Gateway's - cc_config file). These Peers are "destinations" of - messages routed by the Gateway (routing is based on - the settings in the rt_config configuration file). + 2. Consumer Peers (designated by an 'C' in the Gateway's + connection_config file). These Peers are "consumers" of + events forwarded by the Gateway (forwarding is based on + the settings in the consumer_config configuration file). RUNNING THE TESTS @@ -45,39 +54,39 @@ To run the tests do the following: 1. Compile everything (i.e., first compile the ACE libraries, then compile the the Gateway directories). -2. Edit the rt_config and cc_config files as discussed above. +2. Edit the consumer_config and connection_config files as discussed + above to indicate the desired physical and logical mappings. 3. Start up the Peers (peerd). You can start up as many as you - like, as per the cc_config file, but you'll need at least - two (one for supplying input and one for consuming output). I - typically start up each peer in a different window on a different - machine. The peers should print out some diagnostic info and then - block awaiting connections from the Gateway. - -4. Start up the Gateway (gatewayd). This will print out - a bunch of messages as it reads the config files and connects - to all the Peers. Assuming everything works, then all the - Peers will be connected. If some of the Peers aren't set up - correctly then the Gateway will use an exponential backoff - algorithm to attempt to reestablish those connections. + like, as per the connection_config file, but you'll need at least + two (one to supply and one to consume). I typically start up each + Peer in a different window on a different machine. The Peers + should print out some diagnostic info and then block awaiting + connections from the Gateway. + +4. Start up the Gateway (gatewayd). This will print out a bunch of + events as it reads the config files and connects to all the Peers. + Assuming everything works, then all the Peers will be connected. + If some of the Peers aren't set up correctly then the Gateway will + use an exponential backoff algorithm to attempt to reestablish + those connections. 5. Once the Gateway has connected with all the Peers you can send - messages from Input Peers by typing commands in the Peer window. - This input will be sent to the Gateway, which will forward - the message to all Output Peers that have "subscribed" to receive - these messages. + events from Supplier Peers by typing commands in the Peer window. + This Supplier will be sent to the Gateway, which will forward the + event to all Consumer Peers that have "subscribed" to receive these + events. Note that if you type ^C in a Peer window the Peer will shutdown - its handlers and exit. The Gateway will detect this and will - start trying to reestablish the connection using the same - exponential backoff algorithm it used for the initial connection - establishment. + its handlers and exit. The Gateway will detect this and will start + trying to reestablish the connection using the same exponential + backoff algorithm it used for the initial connection establishment. -7. When you want to terminate a Gateway, just type ^C - and the process will shut down gracefully. +7. When you want to terminate a Gateway, just type ^C and the process + will shut down gracefully. Please let me know if there are any questions. Doug -schmidt@cs.wustl.edu + schmidt@cs.wustl.edu diff --git a/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h b/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h index 293d597a99f..43730693c93 100644 --- a/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h +++ b/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY diff --git a/examples/ASX/CCM_App/CCM_App.cpp b/examples/ASX/CCM_App/CCM_App.cpp index 93335efcc9c..e0330cf53de 100644 --- a/examples/ASX/CCM_App/CCM_App.cpp +++ b/examples/ASX/CCM_App/CCM_App.cpp @@ -14,9 +14,6 @@ class ACE_Svc_Export Test_Task : public MT_Task public: virtual int open (void *); virtual int close (u_long); - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); - virtual int svc (void); - virtual int info (char **, size_t) const; virtual int init (int, char *[]); virtual int fini (void); virtual int suspend (void); @@ -52,24 +49,6 @@ Test_Task::resume (void) } int -Test_Task::put (ACE_Message_Block *, ACE_Time_Value *) -{ - return 0; -} - -int -Test_Task::svc (void) -{ - return 0; -} - -int -Test_Task::info (char **, size_t) const -{ - return 0; -} - -int Test_Task::init (int, char *[]) { ACE_DEBUG ((LM_DEBUG, "initializing %s\n", this->name () ? this->name () : "task")); diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h index 7451d38b2b7..498e91d476e 100644 --- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h +++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h @@ -19,7 +19,6 @@ public: virtual int open (void *a = 0); virtual int close (u_long flags = 0); virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - virtual int svc (void) { return 0; } /* Dynamic linking hooks */ virtual int init (int argc, char *argv[]); diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp index fbfed5d13ce..cc2c475bef1 100644 --- a/examples/ASX/Message_Queue/buffer_stream.cpp +++ b/examples/ASX/Message_Queue/buffer_stream.cpp @@ -30,12 +30,6 @@ public: // ACE_Task hooks virtual int open (void * = 0); virtual int close (u_long = 0); - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) { return 0; } - - // ACE_Service_Object hooks - virtual int init (int, char **) { return 0; } - virtual int fini (void) { return 0; } - virtual int info (char **, size_t) const { return 0; } }; // Define the Producer interface. diff --git a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h index 3dd706eaa6a..a815e7d642a 100644 --- a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h +++ b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h @@ -19,7 +19,6 @@ public: virtual int open (void *a = 0); virtual int close (u_long flags = 0); virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0); - virtual int svc (void) { return 0; } // Dynamic linking hooks. virtual int init (int argc, char *argv[]); diff --git a/examples/Connection/non_blocking/CPP-connector.h b/examples/Connection/non_blocking/CPP-connector.h index 3cf2a6eb2fb..81be8b6ba71 100644 --- a/examples/Connection/non_blocking/CPP-connector.h +++ b/examples/Connection/non_blocking/CPP-connector.h @@ -34,9 +34,6 @@ protected: // Keeps track of which state we are in. private: - // = Disallow these methods... - virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - virtual int svc (void) { return 0; } }; template <class SVC_HANDLER, ACE_PEER_CONNECTOR_1> diff --git a/examples/Reactor/Misc/test_demuxing.cpp b/examples/Reactor/Misc/test_demuxing.cpp index cacac69f5e6..f8f7992ce37 100644 --- a/examples/Reactor/Misc/test_demuxing.cpp +++ b/examples/Reactor/Misc/test_demuxing.cpp @@ -212,11 +212,6 @@ public: // Run the "event-loop" periodically putting messages to our // internal <Message_Queue> that we inherit from <ACE_Task>. - // = Not used... - virtual int open (void *) { return 0; } - virtual int close (u_long) { return 0; } - virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - private: ACE_Reactor_Notification_Strategy notification_strategy_; // This strategy will notify the <ACE_Reactor> Singleton when a new diff --git a/examples/Reactor/Misc/test_reactors.cpp b/examples/Reactor/Misc/test_reactors.cpp index dbb60941f24..5426af32d2a 100644 --- a/examples/Reactor/Misc/test_reactors.cpp +++ b/examples/Reactor/Misc/test_reactors.cpp @@ -1,6 +1,6 @@ -// Perform a torture test of multiple ACE_Reactors and ACE_Tasks in // $Id$ +// Perform a torture test of multiple ACE_Reactors and ACE_Tasks in // the same process... Thanks to Detlef Becker for contributing this. #include "ace/Reactor.h" @@ -21,7 +21,6 @@ public: virtual int open (void *args = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); virtual int svc (void); virtual int handle_input (ACE_HANDLE handle); @@ -29,7 +28,6 @@ public: ACE_Reactor_Mask close_mask); private: - ACE_Reactor *r_; int handled_; static int task_count_; @@ -64,7 +62,7 @@ Test_Task::~Test_Task (void) int Test_Task::open (void *args) { - r_ = (ACE_Reactor *) args; + this->reactor ((ACE_Reactor *) args); return this->activate (THR_NEW_LWP); } @@ -81,13 +79,6 @@ Test_Task::close (u_long) } int -Test_Task::put (ACE_Message_Block *, - ACE_Time_Value *) -{ - return 0; -} - -int Test_Task::svc (void) { for (int i = 0; i < NUM_INVOCATIONS; i++) @@ -96,7 +87,7 @@ Test_Task::svc (void) // ACE_DEBUG ((LM_DEBUG, "(%t) calling notify %d\n", i)); - if (r_->notify (this, ACE_Event_Handler::READ_MASK) == -1) + if (this->reactor ()->notify (this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1); // ACE_DEBUG ((LM_DEBUG, "(%t) leaving notify %d\n", i)); diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp index 334cfce80aa..488fa1e683a 100644 --- a/examples/Reactor/ReactorEx/test_reactorEx.cpp +++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp @@ -76,9 +76,6 @@ public: // Called when output events should start private: - int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - // Make Task happy. - ACE_SOCK_Stream stream_; // Socket that we have connected to the server. diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp index 334cfce80aa..488fa1e683a 100644 --- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp +++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp @@ -76,9 +76,6 @@ public: // Called when output events should start private: - int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } - // Make Task happy. - ACE_SOCK_Stream stream_; // Socket that we have connected to the server. diff --git a/examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp b/examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp new file mode 100644 index 00000000000..8af66b90f34 --- /dev/null +++ b/examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp @@ -0,0 +1,89 @@ +// $Id$ + +#include "ace/SV_Shared_Memory.h" +#include "ace/SV_Semaphore_Simple.h" +#include "ace/Malloc.h" + +#if defined (ACE_HAS_SYSV_IPC) + +// Shared memory allocator (note that this chews up the +// ACE_DEFAULT_SEM_KEY). +static ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple> allocator; + +const int SEM_KEY = ACE_DEFAULT_SEM_KEY + 1; +const int SHMSZ = 27; + +static int +parent (char *shm) +{ + char *s = shm; + + ACE_SV_Semaphore_Complex sem (SEM_KEY, ACE_SV_Semaphore_Complex::ACE_CREATE, 0, 2); + + for (char c = 'a'; c <= 'z'; c++) + *s++ = c; + + *s = '\0'; + + if (sem.release (0) == -1) + ACE_ERROR ((LM_ERROR, "%p", "parent sem.release(0)")); + else if (sem.acquire (1) == -1) + ACE_ERROR ((LM_ERROR, "%p", "parent sem.acquire(1)")); + + if (allocator.remove () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "allocator.remove")); + if (sem.remove () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "sem.remove")); + return 0; +} + +static int +child (char *shm) +{ + ACE_SV_Semaphore_Complex sem (SEM_KEY, ACE_SV_Semaphore_Complex::ACE_CREATE, 0, 2); + + while (sem.tryacquire (0) == -1) + if (errno == EAGAIN) + ACE_DEBUG ((LM_DEBUG, "spinning in client!\n")); + else + ACE_ERROR_RETURN ((LM_ERROR, "client mutex.tryacquire(0)"), 1); + + for (char *s = (char *) shm; *s != '\0'; s++) + ACE_DEBUG ((LM_DEBUG, "%c", *s)); + + ACE_DEBUG ((LM_DEBUG, "\n")); + + if (sem.release (1) < 0) + ACE_ERROR ((LM_ERROR, "client sem.release(1)")); + return 0; +} + +int +main (void) +{ + char *shm = (char *) allocator.malloc (27); + + switch (ACE_OS::fork ()) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "fork failed\n"), -1); + /* NOTREACHED */ + case 0: + // Child. + return child (shm); + default: + return parent (shm); + } +} +#else +int main (void) +{ + ACE_ERROR ((LM_ERROR, + "SYSV IPC is not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_SYSV_IPC */ + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp b/examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp new file mode 100644 index 00000000000..0405440364c --- /dev/null +++ b/examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp @@ -0,0 +1,100 @@ +// $Id$ + +// Illustrates the use of the ACE_SV_Semaphore_Complex class and the +// ACE_Malloc class using the ACE_Shared_Memory_Pool (which uses +// System V shared memory). Note that it doesn't matter whether the +// parent or the child creates the semaphore since Semaphore_Complex +// will correctly serialize the intialization of the mutex and synch +// objects. + +#include "ace/Malloc.h" +#include "ace/SV_Semaphore_Complex.h" + +#if defined (ACE_HAS_SYSV_IPC) + +// Shared memory allocator (note that this chews up the +// ACE_DEFAULT_SEM_KEY). +static ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple> allocator; + +const int SEM_KEY_1 = ACE_DEFAULT_SEM_KEY + 1; +const int SEM_KEY_2 = ACE_DEFAULT_SEM_KEY + 2; +const int SHMSZ = 27; + +static int +parent (char *shm) +{ + char *s = shm; + + ACE_SV_Semaphore_Complex mutex (SEM_KEY_1, ACE_SV_Semaphore_Complex::ACE_CREATE, 0); + ACE_SV_Semaphore_Complex synch (SEM_KEY_2, ACE_SV_Semaphore_Complex::ACE_CREATE, 0); + + for (char c = 'a'; c <= 'z'; c++) + *s++ = c; + + *s = '\0'; + + if (mutex.release () == -1) + ACE_ERROR ((LM_ERROR, "%p", "parent mutex.release")); + else if (synch.acquire () == -1) + ACE_ERROR ((LM_ERROR, "%p", "parent synch.acquire")); + + if (allocator.remove () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "allocator.remove")); + if (mutex.remove () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "mutex.remove")); + if (synch.remove () == -1) + ACE_ERROR ((LM_ERROR, "%p\n", "synch.remove")); + return 0; +} + +static int +child (char *shm) +{ + ACE_SV_Semaphore_Complex mutex (SEM_KEY_1, ACE_SV_Semaphore_Complex::ACE_CREATE, 0); + ACE_SV_Semaphore_Complex synch (SEM_KEY_2, ACE_SV_Semaphore_Complex::ACE_CREATE, 0); + + while (mutex.tryacquire () == -1) + if (errno == EAGAIN) + ACE_DEBUG ((LM_DEBUG, "spinning in child!\n")); + else + ACE_ERROR_RETURN ((LM_ERROR, "child mutex.tryacquire"), 1); + + for (char *s = (char *) shm; *s != '\0'; s++) + ACE_DEBUG ((LM_DEBUG, "%c", *s)); + + ACE_DEBUG ((LM_DEBUG, "\n")); + + if (synch.release () == -1) + ACE_ERROR_RETURN ((LM_ERROR, "child synch.release"), 1); + return 0; +} + +int +main (void) +{ + char *shm = (char *) allocator.malloc (27); + + switch (ACE_OS::fork ()) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, "fork failed\n"), -1); + /* NOTREACHED */ + case 0: + // Child. + return child (shm); + default: + return parent (shm); + } +} +#else +int main (void) +{ + ACE_ERROR ((LM_ERROR, + "SYSV IPC is not supported on this platform\n")); + return 0; +} +#endif /* ACE_HAS_SYSV_IPC */ + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/examples/Threads/future1.cpp b/examples/Threads/future1.cpp index ea295e487e1..00814fee881 100644 --- a/examples/Threads/future1.cpp +++ b/examples/Threads/future1.cpp @@ -57,7 +57,6 @@ public: virtual int open (void *args = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); virtual int svc (void); ACE_Future<double> work (double param, int count); @@ -196,13 +195,6 @@ Scheduler::close (u_long) return 0; } -// put... ?? -int -Scheduler::put (ACE_Message_Block *, ACE_Time_Value *) -{ - return 0; -} - // service.. int Scheduler::svc (void) diff --git a/examples/Threads/future2.cpp b/examples/Threads/future2.cpp index 55ce8c05a40..f034d765c1e 100644 --- a/examples/Threads/future2.cpp +++ b/examples/Threads/future2.cpp @@ -73,10 +73,6 @@ private: virtual int close (u_long flags = 0); // Should not be accessible from outside... (use end () instead). - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) - { return 0; }; - // Doesn't have any use for this example. - virtual int svc (void); // Here the actual servicing of all requests is happening.. diff --git a/examples/Threads/task_four.cpp b/examples/Threads/task_four.cpp index 64209cb3430..87a2f3abfa8 100644 --- a/examples/Threads/task_four.cpp +++ b/examples/Threads/task_four.cpp @@ -55,11 +55,6 @@ private: // Number of threads per task. int n_iterations_; // Number of iterations per thread. - - // = Not needed for this test. - virtual int open (void *) { return 0; } - virtual int close (u_long) { return 0; } - virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } }; class Worker_Task : public ACE_Task<ACE_MT_SYNCH> diff --git a/examples/Threads/task_one.cpp b/examples/Threads/task_one.cpp index d0a8a12e6c4..7acf8de3789 100644 --- a/examples/Threads/task_one.cpp +++ b/examples/Threads/task_one.cpp @@ -31,11 +31,6 @@ private: int n_iterations_; // Number of iterations to run. - - // = Not needed for this test. - virtual int open (void *) { return 0; } - virtual int close (u_long) { return 0; } - virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } }; Barrier_Task::Barrier_Task (ACE_Thread_Manager *thr_mgr, diff --git a/examples/Threads/task_three.cpp b/examples/Threads/task_three.cpp index 0214ac10ddf..4b365aba8a5 100644 --- a/examples/Threads/task_three.cpp +++ b/examples/Threads/task_three.cpp @@ -33,7 +33,6 @@ public: virtual int open (void *args = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); virtual int svc (void); virtual int handle_input (ACE_HANDLE fd); @@ -86,12 +85,6 @@ Test_Task::close (u_long) return 0; } -int -Test_Task::put (ACE_Message_Block *, ACE_Time_Value *) -{ - return 0; -} - Test_Task::svc (void) { // Every thread must register the same stream to write to file. diff --git a/examples/Threads/task_two.cpp b/examples/Threads/task_two.cpp index 1c6366c4b12..c6e88d8eaa5 100644 --- a/examples/Threads/task_two.cpp +++ b/examples/Threads/task_two.cpp @@ -31,7 +31,6 @@ class Task_Test : public ACE_Task<ACE_MT_SYNCH> public: virtual int open (void *args = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); virtual int svc (void); private: @@ -66,13 +65,6 @@ Task_Test::close (u_long) } int -Task_Test::put (ACE_Message_Block *, - ACE_Time_Value *) -{ - return 0; -} - -int Task_Test::svc (void) { wait_count++; diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp index 9478ed0883d..ddcad02e4dd 100644 --- a/examples/Threads/thread_pool.cpp +++ b/examples/Threads/thread_pool.cpp @@ -34,9 +34,6 @@ public: private: virtual int close (u_long); - - // = Not needed for this test. - virtual int open (void *) { return 0; } }; int diff --git a/examples/Threads/token.cpp b/examples/Threads/token.cpp index 5a51496d011..e8c72be3d1d 100644 --- a/examples/Threads/token.cpp +++ b/examples/Threads/token.cpp @@ -10,9 +10,6 @@ class My_Task : public ACE_Task<ACE_MT_SYNCH> { public: My_Task (int n); - virtual int open (void *) { return 0; } - virtual int close (u_long) { return 0; } - virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } virtual int svc (void); static void sleep_hook (void *); diff --git a/examples/Threads/tss1.cpp b/examples/Threads/tss1.cpp index 7efdc9dc3ef..dd21023fa19 100644 --- a/examples/Threads/tss1.cpp +++ b/examples/Threads/tss1.cpp @@ -96,9 +96,6 @@ public: virtual int open (void *theArgs = 0); virtual int close (u_long theArg = 0); - virtual int put (ACE_Message_Block *theMsgBlock, - ACE_Time_Value *theTimeVal = 0); - virtual int svc (void); }; template <ACE_SYNCH_1> int @@ -119,18 +116,6 @@ int Tester<ACE_SYNCH_2>::close (u_long) return 0; } -template <ACE_SYNCH_1> int -Tester<ACE_SYNCH_2>::put (ACE_Message_Block *, ACE_Time_Value *) -{ - return 0; -} - -template <ACE_SYNCH_1> int -Tester<ACE_SYNCH_2>::svc (void) -{ - return 0; -} - int main (int, char *[]) { diff --git a/netsvcs/servers/svc.conf b/netsvcs/servers/svc.conf index f8634065b13..b136e97bb33 100644 --- a/netsvcs/servers/svc.conf +++ b/netsvcs/servers/svc.conf @@ -1,5 +1,5 @@ # These are the services that can be linked into ACE. -# Note that you can replace the hardcoded "../lib/libnet_svcs" with +# Note that you can replace the hardcoded "../lib/net_svcs" with # a relative path if you set your LD search path correctly -- ACE will # locate this for you automatically by reading your LD search path. # Moreover, ACE will automatically insert the correct suffix (e.g., @@ -7,11 +7,11 @@ # "-p 20xxx" with "-p $PORTxxx" if you set your environment variables # correctly. -dynamic Logger Service_Object * ../lib/libnet_svcs:_make_ACE_Logger() "-s foobar -f STDERR|OSTREAM" -dynamic Time_Service Service_Object * ../lib/libnet_svcs:_make_ACE_TS_Server_Acceptor() "-p 10222" -dynamic Name_Server Service_Object * ../lib/libnet_svcs:_make_ACE_Name_Acceptor() "-p 10012" -dynamic Token_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Token_Acceptor() "-p 10202" -dynamic Server_Logging_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Server_Logging_Acceptor() active "-p 10009" -dynamic Thr_Server_Logging_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Thr_Server_Logging_Acceptor() active "-p 10020" -dynamic Client_Logging_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Client_Logging_Connector() active "-p 10009" +dynamic Logger Service_Object * ../lib/net_svcs:_make_ACE_Logger() "-s foobar -f STDERR|OSTREAM" +dynamic Time_Service Service_Object * ../lib/net_svcs:_make_ACE_TS_Server_Acceptor() "-p 10222" +dynamic Name_Server Service_Object * ../lib/net_svcs:_make_ACE_Name_Acceptor() "-p 10012" +dynamic Token_Service Service_Object * ../lib/net_svcs:_make_ACE_Token_Acceptor() "-p 10202" +dynamic Server_Logging_Service Service_Object * ../lib/net_svcs:_make_ACE_Server_Logging_Acceptor() active "-p 10009" +dynamic Thr_Server_Logging_Service Service_Object * ../lib/net_svcs:_make_ACE_Thr_Server_Logging_Acceptor() active "-p 10020" +dynamic Client_Logging_Service Service_Object * ../lib/net_svcs:_make_ACE_Client_Logging_Connector() active "-p 10009" diff --git a/tests/Buffer_Stream_Test.cpp b/tests/Buffer_Stream_Test.cpp index a228d94e783..aa5ec27414c 100644 --- a/tests/Buffer_Stream_Test.cpp +++ b/tests/Buffer_Stream_Test.cpp @@ -46,12 +46,6 @@ public: // ACE_Task hooks virtual int open (void * = 0); virtual int close (u_long = 0); - virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) { return 0; } - - // ACE_Service_Object hooks - virtual int init (int, char **) { return 0; } - virtual int fini (void) { return 0; } - virtual int info (char **, size_t) const { return 0; } }; // Define the Producer interface. diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp index 00ed154ebbf..8207ede913c 100644 --- a/tests/Future_Test.cpp +++ b/tests/Future_Test.cpp @@ -58,7 +58,6 @@ public: virtual int open (void *args = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); virtual int svc (void); ACE_Future<double> work (double param, int count); @@ -196,13 +195,6 @@ Scheduler::close (u_long) return 0; } -// put... ?? -int -Scheduler::put (ACE_Message_Block *, ACE_Time_Value *) -{ - return 0; -} - // service.. int Scheduler::svc (void) diff --git a/tests/Priority_Task_Test.cpp b/tests/Priority_Task_Test.cpp index 5444e5b987c..bcc52856f43 100644 --- a/tests/Priority_Task_Test.cpp +++ b/tests/Priority_Task_Test.cpp @@ -29,7 +29,6 @@ class Priority_Task : public ACE_Task<ACE_MT_SYNCH> public: Priority_Task (void); - int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } int close (u_long = 0); int open (void *); int svc (void); @@ -65,7 +64,7 @@ Priority_Task::svc (void) ACE_hthread_t thr_handle; ACE_Thread::self (thr_handle); int prio; - ACE_Thread::getprio (thr_handle, &prio); + ACE_Thread::getprio (thr_handle, prio); ACE_ASSERT (this->priority_ == prio); return 0; } diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp index 8bad943910d..2c459cdbf15 100644 --- a/tests/Reactors_Test.cpp +++ b/tests/Reactors_Test.cpp @@ -36,7 +36,6 @@ public: virtual int open (void *args = 0); virtual int close (u_long flags = 0); - virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0); virtual int svc (void); virtual int handle_input (ACE_HANDLE handle); @@ -97,12 +96,6 @@ Test_Task::close (u_long) } int -Test_Task::put (ACE_Message_Block *, ACE_Time_Value *) -{ - return 0; -} - -int Test_Task::svc (void) { ACE_NEW_THREAD; diff --git a/tests/Task_Test.cpp b/tests/Task_Test.cpp index be1256c3023..27c4d1bd98e 100644 --- a/tests/Task_Test.cpp +++ b/tests/Task_Test.cpp @@ -43,11 +43,6 @@ private: int n_iterations_; // Number of iterations to run. - - // = Not needed for this test. - virtual int open (void *) { return 0; } - virtual int close (u_long) { return 0; } - virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; } }; Barrier_Task::Barrier_Task (ACE_Thread_Manager *thr_mgr, diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp index cb7d9069b9a..2363850e7f3 100644 --- a/tests/Thread_Pool_Test.cpp +++ b/tests/Thread_Pool_Test.cpp @@ -41,7 +41,7 @@ public: // Iterate <n_iterations> time printing off a message and "waiting" // for all other threads to complete this iteration. - virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0); + virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0); // This allows the producer to pass messages to the <Thread_Pool>. private: diff --git a/tests/UPIPE_SAP_Test.cpp b/tests/UPIPE_SAP_Test.cpp index f1036c563cf..64a851c28bf 100644 --- a/tests/UPIPE_SAP_Test.cpp +++ b/tests/UPIPE_SAP_Test.cpp @@ -100,17 +100,6 @@ acceptor (void *args) ACE_UPIPE_Acceptor *acceptor = (ACE_UPIPE_Acceptor *) args; ACE_UPIPE_Stream s_stream; - ACE_hthread_t thr_handle; - - // Spawn a connector thread. - if (ACE_Thread::spawn (ACE_THR_FUNC (connector), - (void *) 0, - THR_NEW_LWP | THR_DETACHED, - 0, - &thr_handle) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 0); - - ACE_DEBUG ((LM_DEBUG, "(%t) acceptor starting accept\n")); if (acceptor->accept (s_stream) == -1) ACE_DEBUG ((LM_DEBUG, @@ -163,7 +152,7 @@ main (int, char *[]) // Spawn a acceptor thread. if (ACE_Thread::spawn (ACE_THR_FUNC (acceptor), (void *) &acc, - THR_NEW_LWP | THR_DETACHED, + THR_NEW_LWP, 0, &thr_handle_acceptor) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); @@ -171,15 +160,22 @@ main (int, char *[]) // Spawn a connector thread. if (ACE_Thread::spawn (ACE_THR_FUNC (connector), (void *) 0, - THR_NEW_LWP | THR_DETACHED, + THR_NEW_LWP, 0, &thr_handle_connector) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1); // Wait for both the acceptor and connector threads to exit. - ACE_Thread::join (thr_handle_connector); - ACE_Thread::join (thr_handle_acceptor); + if (ACE_Thread::join (thr_handle_connector) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "join"), -1); + else + ACE_DEBUG ((LM_DEBUG, "(%t) joined with connector thread\n")); + if (ACE_Thread::join (thr_handle_acceptor) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "join"), -1); + else + ACE_DEBUG ((LM_DEBUG, "(%t) joined with acceptor thread\n")); + #else ACE_ERROR ((LM_ERROR, "threads and/or UPIPE not supported on this platform\n")); #endif /* ACE_HAS_THREADS */ diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 92ac675fa09..830260bbb6c 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -66,7 +66,7 @@ run Reader_Writer_Test # uses Thread_Manager, Mutex # ifdef ACE_HAS_STREAM_PIPES run SPIPE_Test # uses SPIPE_Acceptor/Connector, Thread_Manager -# run UPIPE_SAP_Test # uses UPIPE, Thread, Thread_Manager +run UPIPE_SAP_Test # uses UPIPE, Thread, Thread_Manager run Barrier_Test # uses Service_Config, Barrier run Buffer_Stream_Test # uses Service_Config, Module (Stream,Task, Message_Queue) |