summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-15 16:38:54 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-15 16:38:54 +0000
commitb134be83f52912e4e7e3707973a1e24b29d48552 (patch)
tree5c7b58240f4d2ab28cec729bb1b7d46a5d01f47f
parent0d7dedcc81b518738ba0a19347394816bd262322 (diff)
downloadATCD-b134be83f52912e4e7e3707973a1e24b29d48552.tar.gz
*** empty log message ***
-rw-r--r--ChangeLog-96b68
-rw-r--r--Makefile6
-rw-r--r--README52
-rw-r--r--ace/OS.cpp44
-rw-r--r--ace/OS.h67
-rw-r--r--ace/OS.i128
-rw-r--r--ace/SString.cpp5
-rw-r--r--ace/Svc_Handler.cpp15
-rw-r--r--ace/Svc_Handler.h6
-rw-r--r--ace/Task.h6
-rw-r--r--ace/Task.i27
-rw-r--r--ace/Thread.i2
-rw-r--r--ace/config-aix-3.2.5.h9
-rw-r--r--apps/Gateway/Gateway/Concurrency_Strategies.h74
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp45
-rw-r--r--apps/Gateway/Gateway/Config_Files.h29
-rw-r--r--apps/Gateway/Gateway/Consumer_Entry.cpp31
-rw-r--r--apps/Gateway/Gateway/Consumer_Entry.h45
-rw-r--r--apps/Gateway/Gateway/Consumer_Map.cpp61
-rw-r--r--apps/Gateway/Gateway/Consumer_Map.h62
-rw-r--r--apps/Gateway/Gateway/Event.h86
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h99
-rw-r--r--apps/Gateway/Gateway/File_Parser.h5
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp510
-rw-r--r--apps/Gateway/Gateway/Gateway.h7
-rw-r--r--apps/Gateway/Gateway/IO_Handler.cpp710
-rw-r--r--apps/Gateway/Gateway/IO_Handler.h224
-rw-r--r--apps/Gateway/Gateway/IO_Handler_Connector.cpp92
-rw-r--r--apps/Gateway/Gateway/IO_Handler_Connector.h40
-rw-r--r--apps/Gateway/Gateway/Makefile230
-rw-r--r--apps/Gateway/Gateway/README23
-rw-r--r--apps/Gateway/Gateway/Thr_IO_Handler.cpp204
-rw-r--r--apps/Gateway/Gateway/Thr_IO_Handler.h64
-rw-r--r--apps/Gateway/Gateway/consumer_config8
-rw-r--r--apps/Gateway/Gateway/svc.conf2
-rw-r--r--apps/Gateway/Peer/Gateway_Handler.cpp12
-rw-r--r--apps/Gateway/Peer/Gateway_Handler.h2
-rw-r--r--apps/Gateway/Peer/Makefile6
-rw-r--r--apps/Gateway/README95
-rw-r--r--apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h1
-rw-r--r--examples/ASX/CCM_App/CCM_App.cpp21
-rw-r--r--examples/ASX/Event_Server/Event_Server/Event_Analyzer.h1
-rw-r--r--examples/ASX/Message_Queue/buffer_stream.cpp6
-rw-r--r--examples/ASX/UPIPE_Event_Server/Event_Analyzer.h1
-rw-r--r--examples/Connection/non_blocking/CPP-connector.h3
-rw-r--r--examples/Reactor/Misc/test_demuxing.cpp5
-rw-r--r--examples/Reactor/Misc/test_reactors.cpp15
-rw-r--r--examples/Reactor/ReactorEx/test_reactorEx.cpp3
-rw-r--r--examples/Reactor/WFMO_Reactor/test_reactorEx.cpp3
-rw-r--r--examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp89
-rw-r--r--examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp100
-rw-r--r--examples/Threads/future1.cpp8
-rw-r--r--examples/Threads/future2.cpp4
-rw-r--r--examples/Threads/task_four.cpp5
-rw-r--r--examples/Threads/task_one.cpp5
-rw-r--r--examples/Threads/task_three.cpp7
-rw-r--r--examples/Threads/task_two.cpp8
-rw-r--r--examples/Threads/thread_pool.cpp3
-rw-r--r--examples/Threads/token.cpp3
-rw-r--r--examples/Threads/tss1.cpp15
-rw-r--r--netsvcs/servers/svc.conf16
-rw-r--r--tests/Buffer_Stream_Test.cpp6
-rw-r--r--tests/Future_Test.cpp8
-rw-r--r--tests/Priority_Task_Test.cpp3
-rw-r--r--tests/Reactors_Test.cpp7
-rw-r--r--tests/Task_Test.cpp5
-rw-r--r--tests/Thread_Pool_Test.cpp2
-rw-r--r--tests/UPIPE_SAP_Test.cpp26
-rwxr-xr-xtests/run_tests.sh2
69 files changed, 2657 insertions, 925 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b
index d9f57161a00..dfaa77649f9 100644
--- a/ChangeLog-96b
+++ b/ChangeLog-96b
@@ -1,3 +1,59 @@
+Sun Dec 15 10:29:20 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
+
+ * netsvcs/servers/svc.conf: Removed the "lib" prefix for the
+ netsvcs DLL. This is now added automatically by the
+ ACE::ldfind() operation.
+
+ * ace/SString.cpp (ACE_CString): Removed the #pragmas for Win32.
+ They aren't necessary since we should replace the ACE_USHORT16
+ cast with a char cast. Thanks to Amos Shapira <amos@dsi.co.il>
+ for reporting this.
+
+Sat Dec 14 14:25:38 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * build/SunOS5.5/tests/UPIPE_SAP_Test.cpp (main): Fixed several
+ minor bugs with UPIPE_SAP_Test.cpp.
+
+ * ace/OS.i (thr_join): Added implementations for Solaris threads
+ and most versions of POSIX pthreads where ACE_hthread_t and
+ ACE_thread_t are the same type!
+
+ * ace/OS: Began adding hooks so that we can eventually move away
+ from the current split between ACE_thread_t and ACE_hthread_t
+ and unify them via ACE_Thread_ID.
+
+ * ace/{OS,Thread}.h: Changed the interface of thr_getprio() so
+ that it takes an int & rather than an int *.
+
+ * ace/OS.i (thr_getprio): Fixed a minor bug for Win32 where we
+ weren't depositing the thread priority into the return value!
+
+ * Makefile: Changed the order in which things are built so that
+ netsvcs are built right after libACE, followed by the tests.
+
+Sat Dec 14 11:54:22 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
+
+ * apps/Gateway/Gateway/Consumer_Map: Change the Consumer_Map class
+ so that it was no longer templatized. There isn't any point in
+ doing this since we aren't going to be changing these types for
+ this application.
+
+ * apps/Gateway/Gateway: Factored out the code for selecting the
+ concurrency strategy into a separate *.h file called
+ Concurrency_Strategy.h.
+
+ * apps/Gateway/Gateway: Began revising the Gateway application to
+ use the new ACE Event Channel.
+
+ * ace/Svc_Handler: Now that we've got put() and svc() with no-op
+ defaults in class ACE_Task_Base, we don't need them in
+ ACE_Svc_Handler anymore, so I removed them!
+
+ * ace/Task: Finally got sick of having to provide no-op
+ open()/put()/close() routines in all ACE_Task subclasses, so I
+ changed these methods from pure virtual to virtual with default
+ no-op behavior. Updated all the tests, as well.
+
Sat Dec 14 11:39:15 1996 David L. Levine <levine@cs.wustl.edu>
* ace/{Module,Stream,Svc_Handler,Synch_T,Task_T}.cpp and Synch_T.i:
@@ -21,8 +77,6 @@ Fri Dec 13 22:07:11 1996 David L. Levine <levine@cs.wustl.edu>
SOLINK step in build of shared objects for SunOS5 with SunC++
with symlink from .so to .o file.
-Fri Dec 13 13:44:12 1996 David L. Levine <levine@cs.wustl.edu>
-
* ace/config-vxworks*.h: added ACE_NEEDS_SYSTIME_H to VxWorks
configs because it's needed with inlining
@@ -58,6 +112,16 @@ Thu Dec 12 18:51:04 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
* ace/Thread: Added getprio() and setprio() methods to ACE_Thread.
+Fri Dec 13 13:44:12 1996 David L. Levine <levine@cs.wustl.edu>
+
+ * ace/config-vxworks*.h: added ACE_NEEDS_SYSTIME_H to VxWorks
+ configs because it's needed with inlining
+
+ * include/makeinclude/platform_vxworks*.GNU: cleaned up VxWorks
+ config files
+
+Thu Dec 12 18:51:04 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
* ace: Added a new macro called ACE_UNUSED_ARG() to keep
the compiler from outputting warnings about unused
arguments. So far, this is mostly done for Win32, but it
diff --git a/Makefile b/Makefile
index 4ee2711784a..d1e5a1297f1 100644
--- a/Makefile
+++ b/Makefile
@@ -12,11 +12,11 @@ INFO = README \
VERSION
DIRS = ace \
+ netsvcs \
+ tests \
apps \
examples \
- netsvcs \
- performance-tests \
- tests
+ performance-tests
CLONE = Makefile \
ace \
diff --git a/README b/README
index 967bf95d3f6..100fef5f4cf 100644
--- a/README
+++ b/README
@@ -12,32 +12,32 @@ An Object-Oriented Network Programming Toolkit
Overview of ACE
The ADAPTIVE Communication Environment (ACE) is an object-oriented
-toolkit that implements strategic and tactical design patterns to
-simplify the development of concurrent, event-driven communication
-software. ACE provides a rich set of reusable C++ wrappers, class
-categories, and frameworks that perform common communication software
-tasks across a range of operating system platforms. The communication
-software tasks provided by ACE include event demultiplexing and event
-handler dispatching, connection establishment, interprocess
-communication, shared memory management, message routing, dynamic
-(re)configuration of distributed services, multi-threading, and
-concurrency control.
-
-ACE is targeted for developers of high-performance concurrent network
-applications and services. The primary goal of ACE is to simplify the
-development of concurrent OO communication software that utilizes
-interprocess communication, event demultiplexing, explicit dynamic
-linking, and concurrency. In addition, ACE automates communication
-software configuration and reconfiguration by dynamically linking
-services into applications at run-time and executing these services on
-one or more processes or threads.
+(OO) toolkit that implements fundamental design patterns for
+communication software. ACE provides a rich set of reusable C++
+wrappers, class categories, and frameworks that perform common
+communication software tasks across a range of operating system
+platforms. The communication software tasks provided by ACE include
+event demultiplexing and event handler dispatching, service
+initialization, interprocess communication, shared memory management,
+message routing, dynamic (re)configuration of distributed services,
+multi-threading, and concurrency control.
+
+ACE is targeted for developers of high-performance communication
+services and applications on UNIX, POSIX, and Win32 platforms. ACE
+simplifies the development of OO network applications and services
+that utilize interprocess communication, event demultiplexing,
+explicit dynamic linking, and concurrency. ACE automates system
+configuration and reconfiguration by dynamically linking services into
+applications at run-time and executing these services in one or more
+processes or threads.
ACE has been ported to a wide range of uni-processor and multi-process
OS platforms including Win32 (i.e., WinNT and Win95), most versions of
UNIX (e.g., SunOS 4.x and 5.x, SGI IRIX, HP-UX, OSF/1, AIX, Linux, and
SCO), VxWorks, and MVS OpenEdition. It is currently used in
commercial products by dozens of companies including Ericsson,
-Bellcore, Siemens, Motorola, and Kodak.
+Bellcore, Siemens, Motorola, and Kodak. There are both C++ and Java
+versions of ACE available.
The remainder of this document outlines the structure and participants
of the layers in this diagram.
@@ -130,12 +130,12 @@ medical engineering.
OBTAINING ACE
-The current ACE release is provided as a tar file that is slightly
-larger than 1.5 Meg compressed using GNU gzip. ACE may be obtained
-electronically from http://www.cs.wustl.edu/~schmidt/ACE-obtain.html.
-This release contains contains the source code, test drivers, and
-example applications for C++ wrapper libraries and the higher-level
-ACE network programming framework developed as part of the ADAPTIVE
+The current ACE release is provided as a tar file that is around 1.8
+Meg compressed using GNU gzip. ACE may be obtained electronically
+from http://www.cs.wustl.edu/~schmidt/ACE-obtain.html. This release
+contains contains the source code, test drivers, and example
+applications for C++ wrapper libraries and the higher-level ACE
+network programming framework developed as part of the ADAPTIVE
project at the University of California, Irvine and at Washington
University.
diff --git a/ace/OS.cpp b/ace/OS.cpp
index 6559f6209aa..e0b395e66a1 100644
--- a/ace/OS.cpp
+++ b/ace/OS.cpp
@@ -1455,3 +1455,47 @@ ace_mutex_lock_cleanup_adapter (void *args)
{
ACE_OS::mutex_lock_cleanup (args);
}
+
+ACE_Thread_ID::ACE_Thread_ID (ACE_thread_t thread_id,
+ ACE_hthread_t thread_handle)
+ : thread_id_ (thread_id),
+ thread_handle_ (thread_handle)
+{
+}
+
+ACE_thread_t
+ACE_Thread_ID::id (void)
+{
+ return this->thread_id_;
+}
+
+void
+ACE_Thread_ID::id (ACE_thread_t thread_id)
+{
+ this->thread_id_ = thread_id;
+}
+
+ACE_hthread_t
+ACE_Thread_ID::handle (void)
+{
+ return this->thread_handle_;
+}
+
+void
+ACE_Thread_ID::handle (ACE_hthread_t thread_handle)
+{
+ this->thread_handle_ = thread_handle;
+}
+
+int
+ACE_Thread_ID::operator == (const ACE_Thread_ID &rhs)
+{
+ return this->thread_handle_ == rhs.thread_handle_
+ && this->thread_id_ == rhs.thread_id_;
+}
+
+int
+ACE_Thread_ID::operator != (const ACE_Thread_ID &rhs)
+{
+ return !((*this) == rhs);
+}
diff --git a/ace/OS.h b/ace/OS.h
index 3984fa7f451..cd6259ad16c 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -1852,6 +1852,33 @@ typedef int ucontext_t;
#undef t_errno
#endif /* ACE_HAS_BROKEN_T_ERRNO */
+class ACE_Export ACE_Thread_ID
+ // = TITLE
+ // Defines a platform-independent thread ID.
+{
+public:
+ ACE_Thread_ID (ACE_thread_t, ACE_hthread_t);
+
+ // = Set/Get the Thread ID.
+ ACE_thread_t id (void);
+ void id (ACE_thread_t);
+
+ // = Set/Get the Thread handle.
+ ACE_hthread_t handle (void);
+ void handle (ACE_hthread_t);
+
+ // != Comparison operator.
+ int operator == (const ACE_Thread_ID &);
+ int operator != (const ACE_Thread_ID &);
+
+private:
+ ACE_thread_t thread_id_;
+ // Identify the thread.
+
+ ACE_hthread_t thread_handle_;
+ // Handle to the thread (typically used to "wait" on Win32).
+};
+
// Type of the extended signal handler.
typedef void (*ACE_Sig_Handler_Ex) (int, siginfo_t *siginfo, ucontext_t *ucontext);
@@ -2320,7 +2347,26 @@ public:
static int t_sync (ACE_HANDLE fildes);
static int t_unbind (ACE_HANDLE fildes);
- // = A set of wrappers for threads.
+#if 0
+ // = A set of wrappers for threads (these are portable since they use the ACE_Thread_ID).
+ static int thr_continue (const ACE_Thread_ID &thread);
+ static int thr_create (ACE_THR_FUNC,
+ void *args,
+ long flags,
+ ACE_Thread_ID *,
+ u_int priority = 0,
+ void *stack = 0,
+ size_t stacksize = 0);
+ static int thr_getprio (ACE_Thread_ID thr_id, int &prio, int *policy = 0);
+ static int thr_join (ACE_Thread_ID waiter_id, void **status);
+ static int thr_kill (ACE_Thread_ID thr_id, int signum);
+ static ACE_Thread_ID thr_self (void);
+ static int thr_setprio (ACE_Thread_ID thr_id, int prio);
+ static int thr_suspend (ACE_Thread_ID target_thread);
+ static int thr_cancel (ACE_Thread_ID t_id);
+#endif /* 0 */
+
+ // = A set of wrappers for threads (these are non-portable since they use ACE_thread_t and ACE_hthread_t and will go away in a future release).
static int thr_continue (ACE_hthread_t target_thread);
static int thr_create (ACE_THR_FUNC,
void *args,
@@ -2330,30 +2376,31 @@ public:
u_int priority = 0,
void *stack = 0,
size_t stacksize = 0);
+ static int thr_getprio (ACE_hthread_t thr_id, int &prio);
+ static int thr_join (ACE_hthread_t waiter_id, void **status);
+ static int thr_join (ACE_thread_t waiter_id, ACE_thread_t *thr_id, void **status);
+ static int thr_kill (ACE_thread_t thr_id, int signum);
+ static ACE_thread_t thr_self (void);
+ static void thr_self (ACE_hthread_t &);
+ static int thr_setprio (ACE_hthread_t thr_id, int prio);
+ static int thr_suspend (ACE_hthread_t target_thread);
+ static int thr_cancel (ACE_thread_t t_id);
+
static int thr_cmp (ACE_hthread_t t1, ACE_hthread_t t2);
static int thr_equal (ACE_thread_t t1, ACE_thread_t t2);
static void thr_exit (void *status = 0);
static int thr_getconcurrency (void);
- static int thr_getprio (ACE_hthread_t thr_id, int *prio);
static int thr_getspecific (ACE_thread_key_t key, void **data);
- static int thr_join (ACE_hthread_t waiter_id, void **status);
- static int thr_join (ACE_thread_t waiter_id, ACE_thread_t *thr_id, void **status);
static int thr_keyfree (ACE_thread_key_t key);
static int thr_key_detach (void *inst);
static int thr_keycreate (ACE_thread_key_t *key, ACE_THR_DEST, void *inst = 0);
static int thr_key_used (ACE_thread_key_t key);
- static int thr_kill (ACE_thread_t thr_id, int signum);
static size_t thr_min_stack (void);
- static ACE_thread_t thr_self (void);
- static void thr_self (ACE_hthread_t &);
static int thr_setconcurrency (int hint);
- static int thr_setprio (ACE_hthread_t thr_id, int prio);
static int thr_setspecific (ACE_thread_key_t key, void *data);
static int thr_sigsetmask (int how, const sigset_t *nsm, sigset_t *osm);
- static int thr_suspend (ACE_hthread_t target_thread);
static int thr_setcancelstate (int new_state, int *old_state);
static int thr_setcanceltype (int new_type, int *old_type);
- static int thr_cancel (ACE_thread_t t_id);
static int sigwait (sigset_t *set, int *sig = 0);
static void thr_testcancel (void);
static void thr_yield (void);
diff --git a/ace/OS.i b/ace/OS.i
index 78c5fc48212..cdca2f12301 100644
--- a/ace/OS.i
+++ b/ace/OS.i
@@ -3069,20 +3069,26 @@ ACE_OS::thr_getconcurrency (void)
}
ACE_INLINE int
-ACE_OS::thr_getprio (ACE_hthread_t thr_id, int *prio)
+ACE_OS::thr_getprio (ACE_hthread_t thr_id, int &prio)
{
// ACE_TRACE ("ACE_OS::thr_getprio");
#if defined (ACE_HAS_THREADS)
#if defined (ACE_HAS_STHREADS)
- ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_getprio (thr_id, prio), ace_result_), int, -1);
-#elif defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
- ACE_NOTSUP_RETURN (-1);
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_getprio (thr_id, &prio), ace_result_), int, -1);
+#elif (defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)) && !defined (ACE_LACKS_SETSCHED)
+ struct sched_param param;
+ int result;
+ int policy = 0;
+
+ ACE_OSCALL (ACE_ADAPT_RETVAL (::pthread_getschedparam (thr_id, &policy, &param), result), int, -1, result);
+ prio = param.sched_priority;
+ return result;
#elif defined (ACE_HAS_WTHREADS)
ACE_UNUSED_ARG(prio);
// why is the thread prio not dropped into prio ?
- int result = ::GetThreadPriority (thr_id);
- return result == THREAD_PRIORITY_ERROR_RETURN ? -1 : result;
+ prio = ::GetThreadPriority (thr_id);
+ return prio == THREAD_PRIORITY_ERROR_RETURN ? -1 : 0;
#elif defined (VXWORKS)
ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::taskPriorityGet (thr_id, prio), ace_result_), int, -1);
#endif /* ACE_HAS_STHREADS */
@@ -3159,9 +3165,11 @@ ACE_OS::thr_join (ACE_hthread_t thr_handle, void **status)
status = status;
#if defined (ACE_HAS_THREADS)
#if defined (ACE_HAS_STHREADS)
- ACE_NOTSUP_RETURN (-1);
-#elif defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
- ACE_NOTSUP_RETURN (-1);
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_join (thr_handle, 0, status), ace_result_),
+ int, -1);
+#elif (defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)) && !defined (ACE_HAS_THREAD_SELF)
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::pthread_join (thr_handle, status), ace_result_),
+ int, -1);
#elif defined (ACE_HAS_WTHREADS)
void *local_status = 0;
@@ -3259,12 +3267,12 @@ ACE_OS::thr_setcanceltype (int new_type, int *old_type)
}
ACE_INLINE int
-ACE_OS::thr_cancel (ACE_thread_t t_id)
+ACE_OS::thr_cancel (ACE_thread_t thr_id)
{
// ACE_TRACE ("ACE_OS::thr_cancel");
#if defined (ACE_HAS_THREADS)
#if defined (ACE_HAS_DCETHREADS) || (defined (ACE_HAS_PTHREADS) && defined (ACE_HAS_STHREADS))
- ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::pthread_cancel(t_id),
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::pthread_cancel (thr_id),
ace_result_),
int, -1);
#elif defined (ACE_HAS_PTHREADS)
@@ -3275,7 +3283,7 @@ ACE_OS::thr_cancel (ACE_thread_t t_id)
#elif defined (ACE_HAS_STHREADS)
ACE_NOTSUP_RETURN (-1);
#elif defined (ACE_HAS_WTHREADS)
- ACE_UNUSED_ARG(t_id);
+ ACE_UNUSED_ARG(thr_id);
ACE_NOTSUP_RETURN (-1);
#elif defined (VXWORKS)
@@ -3554,7 +3562,13 @@ ACE_OS::thr_setprio (ACE_hthread_t thr_id, int prio)
ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::thr_setprio (thr_id, prio),
ace_result_),
int, -1);
-#elif defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
+#elif (defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)) && !defined (ACE_LACKS_SETSCHED)
+ struct sched_param param;
+ int result;
+
+ ACE_OSCALL (ACE_ADAPT_RETVAL (::pthread_setschedparam (thr_id, policy, &param), result), int, -1, result);
+ prio = param.sched_priority;
+ return result;
ACE_NOTSUP_RETURN (-1);
#elif defined (ACE_HAS_WTHREADS)
ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::SetThreadPriority (thr_id, prio),
@@ -5842,3 +5856,91 @@ ACE_OS::mkdir (const wchar_t *path, mode_t mode)
#endif /* ACE_WIN32 */
#endif /* ACE_HAS_UNICODE */
+
+#if 0
+ACE_INLINE int
+ACE_OS::thr_continue (const ACE_Thread_ID &thr_id)
+{
+ // ACE_TRACE ("ACE_OS::thr_continue");
+ return ACE_OS::thr_continue (thr_id.id ());
+}
+
+ACE_INLINE int
+ACE_OS::thr_create (ACE_THR_FUNC func,
+ void *args,
+ long flags,
+ ACE_Thread_ID *thr_id = 0,
+ u_int priority = 0,
+ void *stack = 0,
+ size_t stacksize = 0);
+{
+ // ACE_TRACE ("ACE_OS::thr_create");
+ ACE_thread_t thread_id;
+ ACE_hthread_t thread_handle;
+
+ int result = ACE_OS::thr_create (func, args, flags,
+ &thread_id, &thread_handle,
+ priority, stack, stacksize);
+ if (result == -1)
+ return -1;
+ else if (thr_id != 0)
+ {
+ thr_id->id (thread_id);
+ thr_id->handle (thread_handle);
+ return result;
+ }
+}
+
+ACE_INLINE int
+ACE_OS::thr_getprio (const ACE_Thread_ID &thr_id, int &prio)
+{
+ // ACE_TRACE ("ACE_OS::thr_getprio");
+ return ACE_OS::thr_getprio (thr_id.handle (), prio);
+}
+
+ACE_INLINE int
+ACE_OS::thr_join (const ACE_Thread_ID &thr_id, void **status)
+{
+#if defined (ACE_WIN32)
+ return ACE_OS::join (thr_id.id (), status);
+#else
+ return ACE_OS::join (thr_id.handle (), status);
+#endif /* ACE_WIN32 */
+}
+
+ACE_INLINE int
+ACE_OS::thr_cancel (const ACE_Thread_ID &thr_id)
+{
+ return ACE_OS::thr_cancel (thr_id.id ());
+}
+
+ACE_INLINE int
+ACE_OS::thr_kill (const ACE_Thread_ID &thr_id, int signum)
+{
+ return ACE_OS::thr_kill (thr_id.id (), signum);
+}
+
+ACE_INLINE ACE_Thread_ID
+ACE_OS::thr_self (void)
+{
+ ACE_hthread_t thr_handle;
+ ACE_OS::thr_self (thr_handle);
+ ACE_thread_t thr_id = ACE_OS::thr_self ();
+
+ return ACE_Thread_ID (thr_id, thr_handle);
+}
+
+ACE_INLINE int
+ACE_OS::thr_setprio (const ACE_Thread_ID &thr_id, int prio)
+{
+ // ACE_TRACE ("ACE_OS::thr_getprio");
+ return ACE_OS::thr_setprio (thr_id.handle (), prio);
+}
+
+ACE_INLINE int
+ACE_OS::thr_suspend (const ACE_Thread_ID &thr_id)
+{
+ return ACE_OS::thr_suspend (thr_id.handle ());
+}
+
+#endif /* 0 */
diff --git a/ace/SString.cpp b/ace/SString.cpp
index a8269f6362e..47c52efcf68 100644
--- a/ace/SString.cpp
+++ b/ace/SString.cpp
@@ -108,10 +108,7 @@ ACE_CString::ACE_CString (const ACE_USHORT16 *s, ACE_Allocator *alloc)
// Copy the ACE_USHORT16 * string byte-by-byte into the char *
// string.
for (size_t i = 0; i < this->len_; i++)
-#pragma warning(disable: 4244) /* Possible loss of data */
- this->rep_[i] = (ACE_USHORT16) s[i];
-#pragma warning(default: 4244) /* Possible loss of data */
-
+ this->rep_[i] = char (s[i]);
this->rep_[this->len_] = '\0';
}
}
diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp
index 4285901216f..922b6b89eaa 100644
--- a/ace/Svc_Handler.cpp
+++ b/ace/Svc_Handler.cpp
@@ -217,21 +217,6 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::close (unsigned long)
}
template <PR_ST_1, ACE_SYNCH_1> int
-ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::svc (void)
-{
- ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::svc");
- return -1;
-}
-
-template <PR_ST_1, ACE_SYNCH_1> int
-ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::put (ACE_Message_Block *,
- ACE_Time_Value *)
-{
- ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::put");
- return -1;
-}
-
-template <PR_ST_1, ACE_SYNCH_1> int
ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::init (int, char *[])
{
ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::init");
diff --git a/ace/Svc_Handler.h b/ace/Svc_Handler.h
index b66aa783ebd..1228ad0964a 100644
--- a/ace/Svc_Handler.h
+++ b/ace/Svc_Handler.h
@@ -97,9 +97,6 @@ public:
// Returns the underlying PEER_STREAM (used by
// ACE_Acceptor::accept() and ACE_Connector::connect() factories).
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
- // Provide a default implementation to simplify ancestors...
-
virtual void destroy (void);
// Call this instead of <delete> to free up dynamically allocated
// <Svc_Handler>. This method knows whether or not the object was
@@ -119,9 +116,6 @@ public:
// This really should be private so that users are forced to call
// destroy().
- virtual int svc (void);
- // Provide a default implementation to simplify ancestors...
-
private:
void shutdown (void);
// Close down the descriptor
diff --git a/ace/Task.h b/ace/Task.h
index 0b11f6ead24..8e6f4752f27 100644
--- a/ace/Task.h
+++ b/ace/Task.h
@@ -57,11 +57,11 @@ public:
ACE_Task_Base (ACE_Thread_Manager *);
// = Initialization and termination hooks (note that these *must* be defined by subclasses).
- virtual int open (void *args = 0) = 0;
+ virtual int open (void *args = 0);
// Hook called to open a Task. <args> can be used to pass arbitrary
// information into <open>.
- virtual int close (u_long flags = 0) = 0;
+ virtual int close (u_long flags = 0);
// Hook called from ACE_Task_Exit when during thread exit and from
// the default implemenation of module_closed().
@@ -75,7 +75,7 @@ public:
// ACE_Task instance exits.
// = Immediate and deferred processing methods, respectively.
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) = 0;
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
// Transfer msg into the queue to handle immediate processing.
virtual int svc (void);
diff --git a/ace/Task.i b/ace/Task.i
index bf6d310d2fc..7e682513761 100644
--- a/ace/Task.i
+++ b/ace/Task.i
@@ -60,3 +60,30 @@ ACE_Task_Base::svc (void)
ACE_TRACE ("ACE_Task_Base::svc");
return 0;
}
+
+// Default ACE_Task open routine
+
+ACE_INLINE int
+ACE_Task_Base::open (void *)
+{
+ ACE_TRACE ("ACE_Task_Base::open");
+ return 0;
+}
+
+// Default ACE_Task close routine
+
+ACE_INLINE int
+ACE_Task_Base::close (u_long)
+{
+ ACE_TRACE ("ACE_Task_Base::close");
+ return 0;
+}
+
+// Default ACE_Task put routine.
+
+ACE_INLINE int
+ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *)
+{
+ ACE_TRACE ("ACE_Task_Base::put");
+ return 0;
+}
diff --git a/ace/Thread.i b/ace/Thread.i
index 6696a08e3f4..1a5e2746f65 100644
--- a/ace/Thread.i
+++ b/ace/Thread.i
@@ -244,7 +244,7 @@ ACE_Thread::self (ACE_hthread_t &t_id)
}
ACE_INLINE int
-ACE_Thread::getprio (ACE_hthread_t t_id, int *prio)
+ACE_Thread::getprio (ACE_hthread_t t_id, int &prio)
{
ACE_TRACE ("ACE_Thread::getprio");
return ACE_OS::thr_getprio (t_id, prio);
diff --git a/ace/config-aix-3.2.5.h b/ace/config-aix-3.2.5.h
index cb5bf928a89..eb60aa501ea 100644
--- a/ace/config-aix-3.2.5.h
+++ b/ace/config-aix-3.2.5.h
@@ -11,23 +11,14 @@
#define ACE_HAS_ONLY_TWO_PARAMS_FOR_ASCTIME_R_AND_CTIME_R
#define ACE_HAS_RECURSIVE_THR_EXIT_SEMANTICS
#define ACE_HAS_CONSISTENT_SIGNAL_PROTOTYPES
-
#define ACE_HAS_TEMPLATE_TYPEDEFS
-
#define ACE_HAS_STRERROR
-
#define ACE_HAS_SIG_ATOMIC_T
-
#define ACE_HAS_SSIZE_T
-
#define ACE_HAS_CPLUSPLUS_HEADERS
-
#define ACE_HAS_POLL
-
#define ACE_HAS_POSIX_NONBLOCK
-
#define ACE_HAS_AIX_GETTIMEOFDAY
-
#define ACE_HAS_NO_SYSCALL_H
// Compiler/platform has the getrusage() system call.
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h
new file mode 100644
index 00000000000..e2fbc934c93
--- /dev/null
+++ b/apps/Gateway/Gateway/Concurrency_Strategies.h
@@ -0,0 +1,74 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Concurrency_strategies.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_CONCURRENCY_STRATEGIES)
+#define _CONCURRENCY_STRATEGIES
+
+#include "ace/Synch.h"
+
+// The following typedefs are used in order to parameterize the
+// synchronization policies without changing the source code!
+
+// If we don't have threads then use the single-threaded synchronization.
+#if !defined (ACE_HAS_THREADS)
+#define SYNCH_STRATEGY ACE_NULL_SYNCH
+typedef ACE_Null_Mutex MAP_MUTEX;
+#else /* ACE_HAS_THREADS */
+
+// Note that we only need to make the ACE_Task thread-safe if we are
+// using the multi-threaded Thr_Consumer_Handler...
+#if defined (USE_OUTPUT_MT)
+#define SYNCH_STRATEGY ACE_MT_SYNCH
+#else
+#define SYNCH_STRATEGY ACE_NULL_SYNCH
+#endif /* USE_OUTPUT_MT || USE_INPUT_MT */
+
+// Note that we only need to make the ACE_Map_Manager thread-safe if
+// we are using the multi-threaded Thr_Supplier_Handler. In this
+// case, we use an RW_Mutex since we'll lookup Consumers far more
+// often than we'll update them.
+#if defined (USE_INPUT_MT)
+typedef ACE_RW_Mutex MAP_MUTEX;
+#else
+typedef ACE_Null_Mutex MAP_MUTEX;
+#endif /* USE_INPUT_MT */
+#endif /* ACE_HAS_THREADS */
+
+// = Forward decls
+class Thr_Consumer_Handler;
+class Thr_Supplier_Handler;
+class Consumer_Handler;
+class Supplier_Handler;
+
+#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT))
+#if defined (USE_OUTPUT_MT)
+typedef Thr_Consumer_Handler CONSUMER_HANDLER;
+#else
+typedef Consumer_Handler CONSUMER_HANDLER;
+#endif /* USE_OUTPUT_MT */
+
+#if defined (USE_INPUT_MT)
+typedef Thr_Supplier_Handler SUPPLIER_HANDLER;
+#else
+typedef Supplier_Handler SUPPLIER_HANDLER;
+#endif /* USE_INPUT_MT */
+#else
+// Instantiate a non-multi-threaded Gateway.
+typedef Supplier_Handler SUPPLIER_HANDLER;
+typedef Consumer_Handler CONSUMER_HANDLER;
+#endif /* ACE_HAS_THREADS */
+
+#endif /* _CONCURRENCY_STRATEGIES */
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
index f2466544e5b..4c2648addf0 100644
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ b/apps/Gateway/Gateway/Config_Files.cpp
@@ -8,25 +8,23 @@
typedef FP::Return_Type FP_RETURN_TYPE;
FP_RETURN_TYPE
-RT_Config_File_Parser::read_entry (RT_Config_File_Entry &entry,
- int &line_number)
+Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
+ int &line_number)
{
FP_RETURN_TYPE read_result;
- // increment the line count
+
+ // Increment the line count.
line_number++;
- // Ignore comments, check for EOF and EOLINE
- // if this succeeds, we have our connection id
+ // Ignore comments, check for EOF and EOLINE if this succeeds, we
+ // have our connection id.
while ((read_result = this->getint (entry.conn_id_)) != FP::SUCCESS)
{
if (read_result == FP::EOFILE)
return FP::EOFILE;
else if (read_result == FP::EOLINE
|| read_result == FP::COMMENT)
- {
- // increment the line count
- line_number++;
- }
+ line_number++;
}
// Get the logic id.
@@ -51,8 +49,8 @@ RT_Config_File_Parser::read_entry (RT_Config_File_Entry &entry,
}
FP_RETURN_TYPE
-CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry,
- int &line_number)
+Connection_Config_File_Parser::read_entry (Connection_Config_File_Entry &entry,
+ int &line_number)
{
char buf[BUFSIZ];
FP_RETURN_TYPE read_result;
@@ -67,10 +65,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry,
return FP::EOFILE;
else if (read_result == FP::EOLINE ||
read_result == FP::COMMENT)
- {
- // increment the line count
- line_number++;
- }
+ line_number++;
}
// get the hostname
@@ -83,7 +78,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry,
if ((read_result = this->getint (port)) != FP::SUCCESS)
return read_result;
else
- entry.remote_port_ = (u_short) port;
+ entry.remote_poconsumer_ = (u_short) port;
// Get the direction.
if ((read_result = this->getword (buf)) != FP::SUCCESS)
@@ -99,7 +94,7 @@ CC_Config_File_Parser::read_entry (CC_Config_File_Entry &entry,
if ((read_result = this->getint (port)) != FP::SUCCESS)
return read_result;
else
- entry.local_port_ = (u_short) port;
+ entry.local_poconsumer_ = (u_short) port;
return FP::SUCCESS;
}
@@ -113,8 +108,8 @@ int main (int argc, char *argv[])
exit (1);
}
FP_RETURN_TYPE result;
- CC_Config_File_Entry CCentry;
- CC_Config_File_Parser CCfile;
+ Connection_Config_File_Entry CCentry;
+ Connection_Config_File_Parser CCfile;
CCfile.open (argv[1]);
@@ -130,13 +125,13 @@ int main (int argc, char *argv[])
cerr << "Error at line " << line_number << endl;
else
printf ("%d\t%s\t%d\t%c\t%d\t%c\t%d\n",
- CCentry.conn_id_, CCentry.host_, CCentry.remote_port_, CCentry.direction_,
- CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_port_);
+ CCentry.conn_id_, CCentry.host_, CCentry.remote_poconsumer_, CCentry.direction_,
+ CCentry.max_retry_delay_, CCentry.transform_, CCentry.local_poconsumer_);
}
CCfile.close();
- RT_Config_File_Entry RTentry;
- RT_Config_File_Parser RTfile;
+ Consumer_Config_File_Entry RTentry;
+ Consumer_Config_File_Parser RTfile;
RTfile.open (argv[2]);
@@ -165,6 +160,6 @@ int main (int argc, char *argv[])
#endif /* DEBUGGING */
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
-template class File_Parser<CC_Config_File_Entry>;
-template class File_Parser<RT_Config_File_Entry>;
+template class File_Parser<Connection_Config_File_Entry>;
+template class File_Parser<Consumer_Config_File_Entry>;
#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index 9418815ff09..145c3233bae 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
@@ -21,42 +20,42 @@
#include "ace/OS.h"
#include "File_Parser.h"
-class CC_Config_File_Entry
+class Connection_Config_File_Entry
// = TITLE
- // Stores the information in a Channel Connection entry.
+ // Stores the IO_Handler entry for connection configuration.
{
public:
int conn_id_;
- // Connection id for this Channel.
+ // Connection id for this IO_Handler.
char host_[BUFSIZ];
// Host to connect with.
- u_short remote_port_;
+ u_short remote_poconsumer_;
// Port to connect with.
char direction_;
- // 'I' (input) or 'O' (output)
+ // 'S' (supplier) or 'C' (consumer).
int max_retry_delay_;
// Maximum amount of time to wait for reconnecting.
- u_short local_port_;
+ u_short local_poconsumer_;
// Our local port number.
};
-class CC_Config_File_Parser : public File_Parser<CC_Config_File_Entry>
+class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry>
// = TITLE
- // Parser for the Channel Connection file.
+ // Parser for the IO_Handler Connection file.
{
public:
virtual FP::Return_Type
- read_entry (CC_Config_File_Entry &entry, int &line_number);
+ read_entry (Connection_Config_File_Entry &entry, int &line_number);
};
-class RT_Config_File_Entry
+class Consumer_Config_File_Entry
// = TITLE
- // Stores the information in a Routing Table entry.
+ // Stores the information in a Consumer Map entry.
{
public:
enum {
@@ -79,13 +78,13 @@ public:
// Total number of these destinations.
};
-class RT_Config_File_Parser : public File_Parser<RT_Config_File_Entry>
+class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_File_Entry>
// = TITLE
- // Parser for the Routing Table file.
+ // Parser for the Consumer Map file.
{
public:
virtual FP::Return_Type
- read_entry (RT_Config_File_Entry &entry, int &line_number);
+ read_entry (Consumer_Config_File_Entry &entry, int &line_number);
};
#endif /* _CONFIG_FILES */
diff --git a/apps/Gateway/Gateway/Consumer_Entry.cpp b/apps/Gateway/Gateway/Consumer_Entry.cpp
new file mode 100644
index 00000000000..c3dcd96ebbf
--- /dev/null
+++ b/apps/Gateway/Gateway/Consumer_Entry.cpp
@@ -0,0 +1,31 @@
+// Defines an entry in the Consumer Map.
+// $Id$
+
+#include "Consumer_Entry.h"
+
+Consumer_Entry::Consumer_Entry (void)
+{
+ ACE_NEW (this->destinations_, Consumer_Entry::ENTRY_SET);
+}
+
+Consumer_Entry::~Consumer_Entry (void)
+{
+ delete this->destinations_;
+}
+
+// Get the associated set of destinations.
+
+Consumer_Entry::ENTRY_SET *
+Consumer_Entry::destinations (void)
+{
+ return this->destinations_;
+}
+
+// Set the associated set of destinations.
+
+void
+Consumer_Entry::destinations (Consumer_Entry::ENTRY_SET *s)
+{
+ this->destinations_ = s;
+}
+
diff --git a/apps/Gateway/Gateway/Consumer_Entry.h b/apps/Gateway/Gateway/Consumer_Entry.h
new file mode 100644
index 00000000000..fe502991514
--- /dev/null
+++ b/apps/Gateway/Gateway/Consumer_Entry.h
@@ -0,0 +1,45 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Consumer_Entry.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_ROUTING_ENTRY)
+#define _ROUTING_ENTRY
+
+#include "ace/Set.h"
+
+// Forward reference.
+class IO_Handler;
+
+class Consumer_Entry
+{
+ // = TITLE
+ // Defines an entry in the Consumer_Map.
+public:
+ Consumer_Entry (void);
+ ~Consumer_Entry (void);
+
+ typedef ACE_Unbounded_Set<IO_Handler *> ENTRY_SET;
+ typedef ACE_Unbounded_Set_Iterator<IO_Handler *> ENTRY_ITERATOR;
+
+ // = Set/get the associated set of destinations.
+ ENTRY_SET *destinations (void);
+ void destinations (ENTRY_SET *);
+
+protected:
+ ENTRY_SET *destinations_;
+ // The set of destinations;
+};
+
+#endif /* _ROUTING_ENTRY */
diff --git a/apps/Gateway/Gateway/Consumer_Map.cpp b/apps/Gateway/Gateway/Consumer_Map.cpp
new file mode 100644
index 00000000000..6d16601f949
--- /dev/null
+++ b/apps/Gateway/Gateway/Consumer_Map.cpp
@@ -0,0 +1,61 @@
+/* -*- C++ -*- */
+// $Id$
+
+#if !defined (_CONSUMER_MAP_C)
+#define _CONSUMER_MAP_C
+
+#include "Consumer_Map.h"
+
+// Bind the Event_Addr to the INT_ID.
+
+int
+Consumer_Map::bind (Event_Addr event_addr,
+ Consumer_Entry *Consumer_Entry)
+{
+ return this->map_.bind (event_addr, Consumer_Entry);
+}
+
+// Find the Consumer_Entry corresponding to the Event_Addr.
+
+int
+Consumer_Map::find (Event_Addr event_addr,
+ Consumer_Entry *&Consumer_Entry)
+{
+ return this->map_.find (event_addr, Consumer_Entry);
+}
+
+// Unbind (remove) the Event_Addr from the map.
+
+int
+Consumer_Map::unbind (Event_Addr event_addr)
+{
+ return this->map_.unbind (event_addr);
+}
+
+Consumer_Map_Iterator::Consumer_Map_Iterator (Consumer_Map &rt)
+ : map_iter_ (rt.map_)
+{
+}
+
+int
+Consumer_Map_Iterator::next (Consumer_Entry *&ss)
+{
+ // Loop in order to skip over inactive entries if necessary.
+
+ for (ACE_Map_Entry<Event_Addr, Consumer_Entry *> *temp = 0;
+ this->map_iter_.next (temp) != 0;
+ this->advance ())
+ {
+ // Otherwise, return the next item.
+ ss = temp->int_id_;
+ return 1;
+ }
+ return 0;
+}
+
+int
+Consumer_Map_Iterator::advance (void)
+{
+ return this->map_iter_.advance ();
+}
+#endif /* _CONSUMER_MAP_C */
diff --git a/apps/Gateway/Gateway/Consumer_Map.h b/apps/Gateway/Gateway/Consumer_Map.h
new file mode 100644
index 00000000000..fd392afaf6e
--- /dev/null
+++ b/apps/Gateway/Gateway/Consumer_Map.h
@@ -0,0 +1,62 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Consumer_Map.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_CONSUMER_MAP_H)
+#define _CONSUMER_MAP_H
+
+#include "ace/Map_Manager.h"
+#include "Concurrency_Strategies.h"
+#include "Event.h"
+#include "Consumer_Entry.h"
+
+class Consumer_Map
+{
+ // = TITLE
+ // Define a generic consumer map based on the ACE Map_Manager.
+ //
+ // = DESCRIPTION
+ // This class makes it easier to use the Map_Manager.
+public:
+ int bind (Event_Addr event, Consumer_Entry *Consumer_Entry);
+ // Associate Event with the Consumer_Entry.
+
+ int find (Event_Addr event, Consumer_Entry *&Consumer_Entry);
+ // Break any association of EXID.
+
+ int unbind (Event_Addr event);
+ // Locate EXID and pass out parameter via INID. If found,
+ // return 0, else -1.
+
+public:
+ ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_;
+ // Map that associates Event Addrs (external ids) with Consumer_Entry *'s
+ // <internal IDs>.
+};
+
+class Consumer_Map_Iterator
+{
+ // = TITLE
+ // Define an iterator for the Consumer Map.
+public:
+ Consumer_Map_Iterator (Consumer_Map &mm);
+ int next (Consumer_Entry *&);
+ int advance (void);
+
+private:
+ ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX> map_iter_;
+ // Map we are iterating over.
+};
+#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
new file mode 100644
index 00000000000..a8a9374be3c
--- /dev/null
+++ b/apps/Gateway/Gateway/Event.h
@@ -0,0 +1,86 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Event.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (EVENT)
+#define EVENT
+
+// This is the unique connection identifier that denotes a particular
+// IO_Handler in the Gateway.
+typedef short CONN_ID;
+
+class Event_Addr
+ // = TITLE
+ // Address used to identify the source/destination of an event.
+{
+public:
+ Event_Addr (CONN_ID cid = -1, unsigned char lid = 0, unsigned char pay = 0)
+ : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {}
+
+ int operator== (const Event_Addr &pa) const
+ {
+ return this->conn_id_ == pa.conn_id_
+ && this->logical_id_ == pa.logical_id_
+ && this->payload_ == pa.payload_;
+ }
+
+ CONN_ID conn_id_;
+ // Unique connection identifier that denotes a particular IO_Handler.
+
+ unsigned char logical_id_;
+ // Logical ID.
+
+ unsigned char payload_;
+ // Payload type.
+};
+
+
+class Event_Header
+ // = TITLE
+ // Fixed sized header.
+{
+public:
+ typedef unsigned short SUPPLIER_ID;
+ // Type used to route messages from gatewayd.
+
+ enum
+ {
+ INVALID_ID = -1 // No peer can validly use this number.
+ };
+
+ SUPPLIER_ID routing_id_;
+ // Source ID.
+
+ size_t len_;
+ // Length of the message in bytes.
+};
+
+class Event
+ // = TITLE
+ // Variable-sized message (buf_ may be variable-sized between
+ // 0 and MAX_PAYLOAD_SIZE).
+{
+public:
+ enum { MAX_PAYLOAD_SIZE = 1024 };
+ // The maximum size of an Event.
+
+ Event_Header header_;
+ // Message header.
+
+ char buf_[MAX_PAYLOAD_SIZE];
+ // Message payload.
+};
+
+#endif /* EVENT */
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
new file mode 100644
index 00000000000..47dd8572012
--- /dev/null
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -0,0 +1,99 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Event_Channel.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (ACE_EVENT_CHANNEL)
+#define ACE_EVENT_CHANNEL
+
+#include "IO_Handler_Connector.h"
+
+template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER>
+class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler
+ // = TITLE
+ // Define a generic Event_Channel.
+{
+public:
+ // = Initialization and termination methods.
+ ACE_Event_Channel (void);
+
+ int open (int argc, char *argv[]);
+ // Initialize the Channel.
+
+ int close (void);
+ // Close down the Channel.
+
+private:
+ int parse_args (int argc, char *argv[]);
+ // Parse the command-line arguments.
+
+ int parse_connection_config_file (void);
+ // Parse the connection configuration file.
+
+ int parse_consumer_config_file (void);
+ // Parse the consumer map configuration file.
+
+ int initiate_connections (void);
+ // Initiate connections to the peers.
+
+ virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
+ // Perform timer-based performance profiling.
+
+ const char *connection_config_file_;
+ // Name of the connection configuration file.
+
+ const char *consumer_config_file_;
+ // Name of the consumer map configuration file.
+
+ int active_connector_role_;
+ // Enabled if we are playing the role of the active Connector.
+
+ int performance_window_;
+ // Number of seconds after connection establishment to report
+ // throughput.
+
+ int blocking_semantics_;
+ // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
+
+ int debug_;
+ // Are we debugging?
+
+ IO_Handler_Connector *connector_;
+ // This is used to establish the connections actively.
+
+ int socket_queue_size_;
+ // Size of the socket queue (0 means "use default").
+
+ // = Make life easier by defining typedefs.
+ typedef ACE_Map_Manager<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP;
+ typedef ACE_Map_Iterator<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR;
+ typedef ACE_Map_Entry<CONN_ID, IO_Handler *> CONNECTION_MAP_ENTRY;
+
+ CONNECTION_MAP connection_map_;
+ // Table that maps Connection IDs to IO_Handler *'s.
+
+ Consumer_Map consumer_map_;
+ // Map that associates event addresses to a set of Consumer_Handler
+ // *'s.
+};
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ace/Event_Channel.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Event_Channel.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_EVENT_CHANNEL */
diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h
index e36ae71ca94..776d1b2f338 100644
--- a/apps/Gateway/Gateway/File_Parser.h
+++ b/apps/Gateway/Gateway/File_Parser.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
@@ -38,8 +37,8 @@ public:
template <class ENTRY>
class File_Parser
// = TITLE
- // Class used to parse the configuration file for the routing
- // table.
+ // Class used to parse the configuration file for the Consumer
+ // Map.
{
public:
// = Open and Close the file specified
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index f249eb2f37d..82666406070 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -1,315 +1,87 @@
/* -*- C++ -*- */
// $Id$
-
-
-#include "ace/Get_Opt.h"
#include "ace/Service_Config.h"
-#include "Config_Files.h"
+#include "Event_Channel.h"
#include "Gateway.h"
-#include "Channel_Connector.h"
-template <class INPUT_CHANNEL, class OUTPUT_CHANNEL>
class Gateway : public ACE_Service_Object
{
public:
- Gateway (ACE_Thread_Manager * = 0);
+ // = Initialization method.
+ Gateway (void);
+ // = Service configurator hooks.
virtual int init (int argc, char *argv[]);
// Perform initialization.
virtual int fini (void);
// Perform termination.
+ virtual int info (char **, size_t) const;
+ // Return info about this service.
+
protected:
int handle_input (ACE_HANDLE);
+ // Shut down the Gateway when input comes in from the controlling
+ // console.
int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
-
- typedef ACE_Map_Manager<CONN_ID, Channel *, MUTEX> CONFIG_TABLE;
- typedef ACE_Map_Iterator<CONN_ID, Channel *, MUTEX> CONFIG_ITERATOR;
-
- CONFIG_TABLE config_table_;
- // Table that maps Connection IDs to Channel *'s.
-
- ROUTING_TABLE routing_table_;
- // Table that maps Peer addresses to a set of Channel *'s for output.
-
- virtual int info (char **, size_t) const;
- // Return info about this service.
+ // Shut down the Gateway when a signal arrives.
int parse_args (int argc, char *argv[]);
- // Parse gateway configuration arguments obtained from svc.conf file.
-
- int parse_cc_config_file (void);
- // Parse the channel connection configuration file.
-
- int parse_rt_config_file (void);
- // Parse the routing table configuration file.
-
- int initiate_connections (void);
- // Initiate connections to the peers.
-
- virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
- // Perform timer-based performance profiling.
-
- const char *cc_config_file_;
- // Name of the channel connection configuration file.
+ // Parse gateway configuration arguments obtained from svc.conf
+ // file.
- const char *rt_config_file_;
- // Name of the routing table configuration file.
-
- int performance_window_;
- // Number of seconds after connection establishment to report throughput.
-
- int blocking_semantics_;
- // 0 == blocking connects, ACE_NONBLOCK == non-blocking connects.
-
- int debug_;
- // Are we debugging?
-
- Channel_Connector *connector_;
- // This is used to establish the connections actively.
-
- int socket_queue_size_;
- // Size of the socket queue (0 means "use default").
-
- // = Manage output and input channel threads (if used.)
- // if both input and output mt is used, they will share thr_mgr_,
- // thr_mgr_ will always reference the thread manager being used
- // regardless of whether input, output, or both channels are using mt.
- ACE_Thread_Manager *thr_mgr_;
- ACE_Thread_Manager *input_thr_mgr_;
- ACE_Thread_Manager *output_thr_mgr_;
+ ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_;
};
// Convenient shorthands.
+// #define IC SUPPLIER_HANDLER
+// #define OC CONSUMER_HANDLER
-#define IC INPUT_CHANNEL
-#define OC OUTPUT_CHANNEL
-
-template <class IC, class OC> int
-Gateway<IC, OC>::handle_signal (int signum, siginfo_t *, ucontext_t *)
+int
+Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
{
if (signum > 0)
ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum));
- if (this->thr_mgr_ != 0)
- {
-#if defined (ACE_HAS_THREADS)
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads\n"));
- if (this->thr_mgr_->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
-#endif /* ACE_HAS_THREADS */
- }
+ this->event_channel_.close ();
// Shut down the main event loop.
ACE_Service_Config::end_reactor_event_loop ();
return 0;
}
-template <class IC, class OC> int
-Gateway<IC, OC>::handle_input (ACE_HANDLE h)
+int
+Gateway::handle_input (ACE_HANDLE h)
{
- if (ACE_Service_Config::reactor ()->remove_handler (0,
- ACE_Event_Handler::READ_MASK
- | ACE_Event_Handler::DONT_CALL) == -1)
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (0, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "remove_handler"), -1);
+
char buf[BUFSIZ];
// Consume the input...
ACE_OS::read (h, buf, sizeof (buf));
- return this->handle_signal (h);
-}
-template <class IC, class OC> int
-Gateway<IC, OC>::handle_timeout (const ACE_Time_Value &, const void *)
-{
- ACE_DEBUG ((LM_DEBUG, "(%t) doing the performance timeout here...\n"));
- CONFIG_ITERATOR cti (this->config_table_);
-
- // If we've got a ACE_Thread Manager then use it to suspend all
- // the threads. This will enable us to get an accurate count.
-
- if (this->thr_mgr_ != 0)
- {
-#if defined (ACE_HAS_THREADS)
- if (this->thr_mgr_->suspend_all () == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
- ACE_DEBUG ((LM_DEBUG, "(%t) suspending all threads..."));
-#endif /* ACE_HAS_THREADS */
- }
-
- size_t total_bytes_in = 0;
- size_t total_bytes_out = 0;
-
- // Iterate through the routing table connecting all the channels.
-
- for (ACE_Map_Entry <CONN_ID, Channel *> *me = 0;
- cti.next (me) != 0;
- cti.advance ())
- {
- Channel *channel = me->int_id_;
- if (channel->direction () == 'O')
- total_bytes_out += channel->total_bytes ();
- else
- total_bytes_in += channel->total_bytes ();
- }
-
-#if defined (ACE_NLOGGING)
- ACE_OS::fprintf (stderr, "After %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- performance_window_,
- total_bytes_in,
- total_bytes_out);
-
- ACE_OS::fprintf (stderr, "%f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_)));
-
- ACE_OS::fprintf (stderr, "%f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_)));
-#else
- ACE_DEBUG ((LM_DEBUG, "(%t) after %d seconds, \ntotal_bytes_in = %d\ntotal_bytes_out = %d\n",
- this->performance_window_,
- total_bytes_in,
- total_bytes_out));
- ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec received.\n",
- (float) (total_bytes_in * 8 / (float) (1024*1024*this->performance_window_))));
- ACE_DEBUG ((LM_DEBUG, "(%t) %f Mbits/sec sent.\n",
- (float) (total_bytes_out * 8 / (float) (1024*1024*this->performance_window_))));
-#endif /* ACE_NLOGGING */
- // Resume all the threads again.
- if (this->thr_mgr_ != 0)
- {
-#if defined (ACE_HAS_THREADS)
- this->thr_mgr_->resume_all ();
- ACE_DEBUG ((LM_DEBUG, "(%t) resuming all threads..."));
-#endif /* ACE_HAS_THREADS */
- }
- return 0;
+ // Shut us down.
+ return this->handle_signal (h);
}
// Give default values to data members.
-template <class IC, class OC>
-Gateway<IC, OC>::Gateway (ACE_Thread_Manager *thr_mgr)
- : cc_config_file_ ("cc_config"),
- rt_config_file_ ("rt_config"),
- performance_window_ (0),
- blocking_semantics_ (ACE_NONBLOCK),
- debug_ (0),
- connector_ (0),
- socket_queue_size_ (0),
- thr_mgr_ (thr_mgr),
- input_thr_mgr_ (thr_mgr),
- output_thr_mgr_ (thr_mgr)
-{
-}
-// Parse the "command-line" arguments and set the corresponding flags.
-
-template <class IC, class OC> int
-Gateway<IC, OC>::parse_args (int argc, char *argv[])
-{
- ACE_Get_Opt get_opt (argc, argv, "bc:dr:q:w:", 0);
-
- for (int c; (c = get_opt ()) != -1; )
- {
- switch (c)
- {
- case 'b': // Use blocking connection establishment.
- this->blocking_semantics_ = 0;
- break;
- case 'c':
- this->cc_config_file_ = get_opt.optarg;
- break;
- case 'd':
- this->debug_ = 1;
- break;
- case 'q':
- this->socket_queue_size_ = ACE_OS::atoi (get_opt.optarg);
- break;
- case 'r':
- this->rt_config_file_ = get_opt.optarg;
- break;
- case 'w': // Time performance for a designated amount of time.
- this->performance_window_ = ACE_OS::atoi (get_opt.optarg);
- // Use blocking connection semantics so that we get accurate
- // timings (since all connections start at once).
- this->blocking_semantics_ = 0;
- break;
- default:
- break;
- }
- }
- return 0;
-}
-
-// Initiate connections with the peers.
-
-template <class IC, class OC> int
-Gateway<IC, OC>::initiate_connections (void)
-{
- CONFIG_ITERATOR cti (this->config_table_);
-
- // Iterate through the routing table connecting all the channels.
-
- for (ACE_Map_Entry <CONN_ID, Channel *> *me = 0;
- cti.next (me) != 0;
- cti.advance ())
- {
- Channel *channel = me->int_id_;
- if (this->connector_->initiate_connection
- (channel, this->blocking_semantics_ == ACE_NONBLOCK
- ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch) == -1)
- continue;
- }
-
- return 0;
-}
-
-// This method is automatically called when the gateway
-// is shutdown. It gracefully shuts down all the Channels
-// in the Channel connection Config_Table.
-
-template <class IC, class OC> int
-Gateway<IC, OC>::fini (void)
+Gateway::Gateway (void)
{
- // Question: do we need to do anything special about the Routing_Table?
-
- CONFIG_ITERATOR cti (this->config_table_);
-
- for (ACE_Map_Entry <CONN_ID, Channel *> *me;
- cti.next (me) != 0;
- cti.advance ())
- {
- Channel *channel = me->int_id_;
- ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n",
- channel->id ()));
- if (channel->state () != Channel::IDLE)
- // Mark channel as DISCONNECTING so we don't try to reconnect...
- channel->state (Channel::DISCONNECTING);
-
- // Deallocate Channel resources.
- channel->destroy (); // Will trigger a delete.
- }
-
- // Free up the resources allocated dynamically by the ACE_Connector.
- delete this->connector_;
- delete this->thr_mgr_;
-
- return 0;
}
-template <class IC, class OC> int
-Gateway<IC, OC>::init (int argc, char *argv[])
+int
+Gateway::init (int argc, char *argv[])
{
- this->parse_args (argc, argv);
-
- ACE_NEW_RETURN (this->connector_, Channel_Connector (), -1);
-
- if (this->connector_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "out of memory"), -1);
+ if (this->event_channel_.open (argc, argv) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1);
- // Ignore SIPPIPE so each Output_Channel can handle it.
+ // Ignore SIPPIPE so each Consumer_Handler can handle it.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
ACE_Sig_Set sig_set;
@@ -327,46 +99,23 @@ Gateway<IC, OC>::init (int argc, char *argv[])
this,
ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+ return 0;
+}
- if (this->thr_mgr_ == 0)
- // Create a thread manager if using some combination of multi-threaded channels.
-#if defined (USE_OUTPUT_MT) && defined (USE_INPUT_MT)
- this->thr_mgr_ = this->output_thr_mgr_ =
- this->input_thr_mgr_ = ACE_Service_Config::thr_mgr ();
-#elif defined (USE_OUTPUT_MT)
- this->thr_mgr_ = this->output_thr_mgr_ = ACE_Service_Config::thr_mgr ();
-#elif defined (USE_INPUT_MT)
- this->thr_mgr_ = this->input_thr_mgr_ = ACE_Service_Config::thr_mgr ();
-#endif
-
- // Parse the connection configuration file.
- this->parse_cc_config_file ();
-
- // Parse the routing table config file and build the routing table.
- this->parse_rt_config_file ();
-
- // Initiate connections with the peers.
- this->initiate_connections ();
-
- // If this->performance_window_ > 0 start a timer.
-
- if (this->performance_window_ > 0)
- {
- if (ACE_Service_Config::reactor ()->schedule_timer (this, 0,
- this->performance_window_) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "schedule_timer"));
- else
- ACE_DEBUG ((LM_DEBUG, "starting timer for %d seconds...\n",
- this->performance_window_));
- }
+// This method is automatically called when the gateway is shutdown.
+// It closes down the Event Channel.
+int
+Gateway::fini (void)
+{
+ this->event_channel_.close ();
return 0;
}
// Returns information on the currently active service.
-template <class IC, class OC> int
-Gateway<IC, OC>::info (char **strp, size_t length) const
+int
+Gateway::info (char **strp, size_t length) const
{
char buf[BUFSIZ];
@@ -380,182 +129,7 @@ Gateway<IC, OC>::info (char **strp, size_t length) const
return ACE_OS::strlen (buf);
}
-// Parse and build the connection table.
-
-template <class IC, class OC> int
-Gateway<IC, OC>::parse_cc_config_file (void)
-{
- // File that contains the routing table configuration information.
- CC_Config_File_Parser cc_file;
- CC_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (cc_file.open (this->cc_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->cc_config_file_), -1);
-
- // Read config file line at a time.
- while (cc_file.read_entry (entry, line_number) != FP::EOFILE)
- {
- file_empty = 0;
-
- if (this->debug_)
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, host = %s, remote port = %d, "
- "direction = %c, max retry timeout = %d, local port = %d\n",
- entry.conn_id_, entry.host_, entry.remote_port_, entry.direction_,
- entry.max_retry_delay_, entry.local_port_));
-
- Channel *channel = 0;
-
- // The next few lines of code are dependent on whether we are making
- // an Input_Channel or an Output_Channel.
-
- if (entry.direction_ == 'O') // Configure an output channel.
- ACE_NEW_RETURN (channel,
- OUTPUT_CHANNEL (&this->routing_table_,
- this->connector_,
- this->output_thr_mgr_,
- this->socket_queue_size_),
- -1);
- else /* direction == 'I' */ // Configure an input channel.
- ACE_NEW_RETURN (channel,
- INPUT_CHANNEL (&this->routing_table_,
- this->connector_,
- this->input_thr_mgr_,
- this->socket_queue_size_),
- -1);
- if (channel == 0)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) out of memory\n"), -1);
-
- // The following code is common to both Input_ and Output_Channels.
-
- // Initialize the routing entry's peer addressing info.
- channel->bind (ACE_INET_Addr (entry.remote_port_, entry.host_),
- ACE_INET_Addr (entry.local_port_), entry.conn_id_);
-
- // Initialize max timeout.
- channel->max_timeout (entry.max_retry_delay_);
-
- // Try to bind the new Channel to the connection ID.
- switch (this->config_table_.bind (entry.conn_id_, channel))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
- entry.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_DEBUG ((LM_DEBUG, "(%t) duplicate connection %d, already bound\n",
- entry.conn_id_));
- break;
- case 0:
- // Success.
- break;
- }
- }
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: connection channel configuration file was empty\n"));
-
- return 0;
-}
-
-template <class IC, class OC> int
-Gateway<IC, OC>::parse_rt_config_file (void)
-{
- // File that contains the routing table configuration information.
- RT_Config_File_Parser rt_file;
- RT_Config_File_Entry entry;
- int file_empty = 1;
- int line_number = 0;
-
- if (rt_file.open (this->rt_config_file_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", this->rt_config_file_), -1);
-
- // Read config file line at a time.
- while (rt_file.read_entry (entry, line_number) != FP::EOFILE)
- {
- file_empty = 0;
-
- if (this->debug_)
- {
- ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
- "number of destinations = %d\n",
- entry.conn_id_, entry.logical_id_, entry.payload_type_,
- entry.total_destinations_));
- for (int i = 0; i < entry.total_destinations_; i++)
- ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
- i, entry.destinations_[i]));
- }
-
- Routing_Entry *re;
- ACE_NEW_RETURN (re, Routing_Entry, -1);
- Routing_Entry::ENTRY_SET *channel_set = new Routing_Entry::ENTRY_SET;
- Peer_Addr peer_addr (entry.conn_id_, entry.logical_id_,
- entry.payload_type_);
-
- // Add the destinations to the Routing Entry.
- for (int i = 0; i < entry.total_destinations_; i++)
- {
- Channel *channel = 0;
-
- // Lookup destination and add to Routing_Entry set if found.
- if (this->config_table_.find (entry.destinations_[i],
- channel) != -1)
- channel_set->insert (channel);
- else
- ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
- i, entry.destinations_[i]));
- }
-
- // Attach set of destination channels to routing entry.
- re->destinations (channel_set);
-
- // Bind with routing table, keyed by peer address.
- switch (this->routing_table_.bind (peer_addr, re))
- {
- case -1:
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
- entry.conn_id_), -1);
- /* NOTREACHED */
- case 1: // Oops, found a duplicate!
- ACE_DEBUG ((LM_DEBUG, "(%t) duplicate routing table entry %d, "
- "already bound\n", entry.conn_id_));
- break;
- case 0:
- // Success.
- break;
- }
- }
-
- if (file_empty)
- ACE_ERROR ((LM_WARNING,
- "warning: routing table configuration file was empty\n"));
-
- return 0;
-}
-
-#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT))
-#if defined (USE_OUTPUT_MT)
-typedef Thr_Output_Channel OUTPUT_CHANNEL;
-#else
-typedef Output_Channel OUTPUT_CHANNEL;
-#endif /* USE_OUTPUT_MT */
-
-#if defined (USE_INPUT_MT)
-typedef Thr_Input_Channel INPUT_CHANNEL;
-#else
-typedef Input_Channel INPUT_CHANNEL;
-#endif /* USE_INPUT_MT */
-#else
-// Instantiate a non-multi-threaded Gateway.
-typedef Input_Channel INPUT_CHANNEL;
-typedef Output_Channel OUTPUT_CHANNEL;
-#endif /* ACE_HAS_THREADS */
-
-typedef Gateway<INPUT_CHANNEL, OUTPUT_CHANNEL> ACE_Gateway;
-
// The following is a "Factory" used by the ACE_Service_Config and
// svc.conf file to dynamically initialize the state of the Gateway.
-ACE_SVC_FACTORY_DEFINE (ACE_Gateway)
+ACE_SVC_FACTORY_DEFINE (Gateway)
diff --git a/apps/Gateway/Gateway/Gateway.h b/apps/Gateway/Gateway/Gateway.h
index b00d87628de..057ce981701 100644
--- a/apps/Gateway/Gateway/Gateway.h
+++ b/apps/Gateway/Gateway/Gateway.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
@@ -20,11 +19,7 @@
#include "ace/OS.h"
-ACE_SVC_FACTORY_DECLARE (ACE_Gateway)
+ACE_SVC_FACTORY_DECLARE (Gateway)
#endif /* ACE_GATEWAY */
-
-
-
-
diff --git a/apps/Gateway/Gateway/IO_Handler.cpp b/apps/Gateway/Gateway/IO_Handler.cpp
new file mode 100644
index 00000000000..94997955979
--- /dev/null
+++ b/apps/Gateway/Gateway/IO_Handler.cpp
@@ -0,0 +1,710 @@
+// $Id$
+
+#include "Consumer_Entry.h"
+#include "IO_Handler_Connector.h"
+
+// Convenient short-hands.
+#define CO CONDITION
+#define MU MAP_MUTEX
+
+// The total number of bytes sent/received on this channel.
+
+size_t
+IO_Handler::total_bytes (void)
+{
+ return this->total_bytes_;
+}
+
+void
+IO_Handler::total_bytes (size_t bytes)
+{
+ this->total_bytes_ += bytes;
+}
+
+IO_Handler::IO_Handler (Consumer_Map *consumer_map,
+ IO_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr),
+ consumer_map_ (consumer_map),
+ id_ (-1),
+ total_bytes_ (0),
+ state_ (IO_Handler::IDLE),
+ connector_ (ioc),
+ timeout_ (1),
+ max_timeout_ (IO_Handler::MAX_RETRY_TIMEOUT),
+ socket_queue_size_ (socket_queue_size)
+{
+}
+
+// Set the associated channel.
+
+void
+IO_Handler::active (int a)
+{
+ this->state (a == 0 ? IO_Handler::IDLE : IO_Handler::ESTABLISHED);
+}
+
+// Get the associated channel.
+
+int
+IO_Handler::active (void)
+{
+ return this->state () == IO_Handler::ESTABLISHED;
+}
+
+// Set the direction.
+
+void
+IO_Handler::direction (char d)
+{
+ this->direction_ = d;
+}
+
+// Get the direction.
+
+char
+IO_Handler::direction (void)
+{
+ return this->direction_;
+}
+
+// Sets the timeout delay.
+
+void
+IO_Handler::timeout (int to)
+{
+ if (to > this->max_timeout_)
+ to = this->max_timeout_;
+
+ this->timeout_ = to;
+}
+
+// Recalculate the current retry timeout delay using exponential
+// backoff. Returns the original timeout (i.e., before the
+// recalculation).
+
+int
+IO_Handler::timeout (void)
+{
+ int old_timeout = this->timeout_;
+ this->timeout_ *= 2;
+
+ if (this->timeout_ > this->max_timeout_)
+ this->timeout_ = this->max_timeout_;
+
+ return old_timeout;
+}
+
+// Sets the max timeout delay.
+
+void
+IO_Handler::max_timeout (int mto)
+{
+ this->max_timeout_ = mto;
+}
+
+// Gets the max timeout delay.
+
+int
+IO_Handler::max_timeout (void)
+{
+ return this->max_timeout_;
+}
+
+// Restart connection asynchronously when timeout occurs.
+
+int
+IO_Handler::handle_timeout (const ACE_Time_Value &, const void *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) attempting to reconnect IO_Handler %d with timeout = %d\n",
+ this->id (), this->timeout_));
+ return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
+}
+
+// Restart connection (blocking_semantics dicates whether we
+// restart synchronously or asynchronously).
+
+int
+IO_Handler::reinitiate_connection (void)
+{
+ // Skip over deactivated descriptors.
+ if (this->get_handle () != -1)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ this->peer ().close ();
+
+#if 0
+// if (this->state () == FAILED)
+// {
+ // Reinitiate timeout to improve reconnection time.
+// this->timeout (1);
+#endif
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of IO_Handler %d\n",
+ this->id ()));
+
+ // Reschedule ourselves to try and connect again.
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (this, 0, this->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ return 0;
+}
+
+// Handle shutdown of the IO_Handler object.
+
+int
+IO_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down IO_Handler %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ return this->reinitiate_connection ();
+}
+
+// Set the state of the channel.
+
+void
+IO_Handler::state (IO_Handler::State s)
+{
+ this->state_ = s;
+}
+
+// Perform the first-time initiation of a connection to the peer.
+
+int
+IO_Handler::initialize_connection (void)
+{
+ this->state_ = IO_Handler::ESTABLISHED;
+
+ // Restart the timeout to 1.
+ this->timeout (1);
+
+#if defined (ASSIGN_SUPPLIER_ID)
+ // Action that sends the route id to the peerd.
+
+ CONN_ID id = htons (this->id ());
+
+ ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
+
+ if (n != sizeof id)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ n == 0 ? "gatewayd has closed down unexpectedly" : "send"),
+ -1);
+#endif /* ASSIGN_SUPPLIER_ID */
+ return 0;
+}
+
+// Set the size of the socket queue.
+
+void
+IO_Handler::socket_queue_size (void)
+{
+ if (this->socket_queue_size_ > 0)
+ {
+ int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF;
+
+ if (this->peer ().set_option (SOL_SOCKET, option,
+ &this->socket_queue_size_, sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+ }
+}
+
+// Upcall from the ACE_Acceptor::handle_input() that
+// delegates control to our application-specific IO_Handler.
+
+int
+IO_Handler::open (void *a)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) IO_Handler's fd = %d\n",
+ this->peer ().get_handle ()));
+
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn on non-blocking I/O.
+ if (this->peer ().enable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Call down to the base class to activate and register this handler.
+ if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
+
+ return this->initialize_connection ();
+}
+
+// Return the current state of the channel.
+
+IO_Handler::State
+IO_Handler::state (void)
+{
+ return this->state_;
+}
+
+void
+IO_Handler::id (CONN_ID id)
+{
+ this->id_ = id;
+}
+
+CONN_ID
+IO_Handler::id (void)
+{
+ return this->id_;
+}
+
+// Set the peer's address information.
+int
+IO_Handler::bind (const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ CONN_ID id)
+{
+ this->remote_addr_ = remote_addr;
+ this->local_addr_ = local_addr;
+ this->id_ = id;
+ return 0;
+}
+
+ACE_INET_Addr &
+IO_Handler::remote_addr (void)
+{
+ return this->remote_addr_;
+}
+
+ACE_INET_Addr &
+IO_Handler::local_addr (void)
+{
+ return this->local_addr_;
+}
+
+// Constructor sets the consumer map pointer.
+
+Consumer_Handler::Consumer_Handler (Consumer_Map *consumer_map,
+ IO_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
+{
+ this->direction_ = 'C';
+ this->msg_queue ()->high_water_mark (Consumer_Handler::QUEUE_SIZE);
+}
+
+// This method should be called only when the peer shuts down
+// unexpectedly. This method simply marks the IO_Handler as
+// having failed so that handle_close () can reconnect.
+
+int
+Consumer_Handler::handle_input (ACE_HANDLE)
+{
+ char buf[1];
+
+ this->state (IO_Handler::FAILED);
+
+ switch (this->peer ().recv (buf, sizeof buf))
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has failed unexpectedly for Output IO_Handler %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case 0:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has shutdown unexpectedly for Output IO_Handler %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer is sending input on Output IO_Handler %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ }
+}
+
+// Perform a non-blocking put() of event MB. If we are unable to
+// send the entire event the remainder is re-queued at the *front* of
+// the Event_List.
+
+int
+Consumer_Handler::nonblk_put (ACE_Message_Block *mb)
+{
+ // Try to send the event. If we don't send it all (e.g., due to
+ // flow control), then re-queue the remainder at the head of the
+ // Event_List and ask the ACE_Reactor to inform us (via
+ // handle_output()) when it is possible to try again.
+
+ ssize_t n;
+
+ if ((n = this->send (mb)) == -1)
+ {
+ // Things have gone wrong, let's try to close down and set up a new reconnection.
+ this->state (IO_Handler::FAILED);
+ this->handle_close ();
+ return -1;
+ }
+ else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+ // ACE_Queue in *front* of the list to preserve order.
+ if (this->msg_queue ()->enqueue_head
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
+
+ // Tell ACE_Reactor to call us back when we can send again.
+ else if (ACE_Service_Config::reactor ()->
+ schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
+ return 0;
+ }
+ else
+ return n;
+}
+
+int
+Consumer_Handler::send (ACE_Message_Block *mb)
+{
+ ssize_t n;
+ size_t len = mb->length ();
+
+ if ((n = this->peer ().send (mb->rd_ptr (), len)) <= 0)
+ return errno == EWOULDBLOCK ? 0 : n;
+ else if (n < len)
+ // Re-adjust pointer to skip over the part we did send.
+ mb->rd_ptr (n);
+ else /* if (n == length) */
+ {
+ // The whole event is sent, we can now safely deallocate the
+ // buffer. Note that this should decrement a reference count...
+ delete mb;
+ errno = 0;
+ }
+ this->total_bytes (n);
+ return n;
+}
+
+// Finish sending an event when flow control conditions abate.
+// This method is automatically called by the ACE_Reactor.
+
+int
+Consumer_Handler::handle_output (ACE_HANDLE)
+{
+ ACE_Message_Block *mb = 0;
+ int status = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in handle_output on handle %d\n",
+ this->get_handle ()));
+ // The list had better not be empty, otherwise there's a bug!
+
+ if (this->msg_queue ()->dequeue_head
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (mb))
+ {
+ case 0: // Partial send.
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ // Didn't write everything this time, come back later...
+ break;
+
+ case -1:
+ // Caller is responsible for freeing a ACE_Message_Block if failures occur.
+ delete mb;
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
+
+ /* FALLTHROUGH */
+ default: // Sent the whole thing.
+
+ // If we succeed in writing the entire event (or we did not
+ // fail due to EWOULDBLOCK) then check if there are more
+ // events on the Event_List. If there aren't, tell the
+ // ACE_Reactor not to notify us anymore (at least until
+ // there are new events queued up).
+
+ if (this->msg_queue ()->is_empty ())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing deactivated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+
+ if (ACE_Service_Config::reactor ()->
+ cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
+ }
+ }
+ }
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
+ return 0;
+}
+
+// Send an event to a peer (may queue if necessary).
+
+int
+Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ if (this->msg_queue ()->is_empty ())
+ // Try to send the event *without* blocking!
+ return this->nonblk_put (mb);
+ else
+ // If we have queued up events due to flow control then just
+ // enqueue and return.
+ return this->msg_queue ()->enqueue_tail
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Constructor sets the consumer map pointer and the connector
+// pointer.
+
+Supplier_Handler::Supplier_Handler (Consumer_Map *consumer_map,
+ IO_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : msg_frag_ (0),
+ IO_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
+{
+ this->direction_ = 'S';
+ this->msg_queue ()->high_water_mark (0);
+}
+
+// Receive a Peer event from peerd. Handles fragmentation.
+//
+// The routing event returned from recv consists of two parts:
+// 1. The Address part, contains the virtual routing id.
+// 2. The Data part, which contains the actual data to be routed.
+//
+// The reason for having two parts is to shield the higher layers
+// of software from knowledge of the event structure.
+
+int
+Supplier_Handler::recv (ACE_Message_Block *&forward_addr)
+{
+ Event *event;
+ size_t len;
+ ssize_t n = 0;
+ ssize_t m = 0;
+ size_t offset = 0;
+
+ if (this->msg_frag_ == 0)
+ // No existing fragment...
+ ACE_NEW_RETURN (this->msg_frag_,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ event = (Event *) this->msg_frag_->rd_ptr ();
+
+ const ssize_t HEADER_SIZE = sizeof (Event_Header);
+ ssize_t header_bytes_left_to_read = HEADER_SIZE - this->msg_frag_->length ();
+
+ if (header_bytes_left_to_read > 0)
+ {
+ n = this->peer ().recv (this->msg_frag_->wr_ptr (),
+ header_bytes_left_to_read);
+
+ if (n == -1 /* error */
+ || n == 0 /* EOF */)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "Recv error during header read "));
+ ACE_DEBUG ((LM_DEBUG,
+ "attempted to read %d\n",
+ header_bytes_left_to_read));
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ return n;
+ }
+
+ // Bump the write pointer by the amount read.
+ this->msg_frag_->wr_ptr (n);
+
+ // At this point we may or may not have the ENTIRE header.
+ if (this->msg_frag_->length () < HEADER_SIZE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Partial header received: only %d bytes\n",
+ this->msg_frag_->length ()));
+ // Notify the caller that we didn't get an entire event.
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ }
+
+ // At this point there is a complete, valid header in msg_frag_
+ len = sizeof event->buf_ + HEADER_SIZE - this->msg_frag_->length ();
+
+ // Try to receive the remainder of the event
+
+ switch (m = this->peer ().recv (event->buf_ + offset, len))
+ {
+ case -1:
+ if (errno == EWOULDBLOCK)
+ {
+ // This shouldn't happen since the ACE_Reactor
+ // just triggered us to handle pending I/O!
+ ACE_DEBUG ((LM_DEBUG, "(%t) unexpected recv failure\n"));
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ else
+ /* FALLTHROUGH */;
+
+ case 0: // Premature EOF.
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ return 0;
+
+ default:
+ if (m != len)
+ // Re-adjust pointer to skip over the part we've read.
+ {
+ this->msg_frag_->wr_ptr (m);
+ errno = EWOULDBLOCK;
+ return -1; // Inform caller that we didn't get the whole event.
+ }
+ else
+ {
+ // Set the write pointer at 1 past the end of the event.
+ this->msg_frag_->wr_ptr (m);
+
+ // Set the read pointer to the beginning of the event.
+ this->msg_frag_->rd_ptr (this->msg_frag_->base ());
+
+ // Allocate an event forwarding header and chain the data
+ // portion onto its continuation field.
+ ACE_NEW_RETURN (forward_addr,
+ ACE_Message_Block (sizeof (Event_Addr),
+ ACE_Message_Block::MB_PROTO,
+ this->msg_frag_),
+ -1);
+
+ Event_Addr event_addr (this->id (), event->header_.routing_id_, 0);
+ // Copy the forwarding address from the Event_Addr into
+ // forward_addr.
+ forward_addr->copy ((char *) &event_addr, sizeof (Event));
+
+ // Reset the pointer to indicate we've got an entire event.
+ this->msg_frag_ = 0;
+ }
+ this->total_bytes (m + n);
+#if defined (VERBOSE)
+ ACE_DEBUG ((LM_DEBUG, "(%t) channel id = %d, route id = %d, len = %d, payload = %*s",
+ event_addr.conn_id_, event->header_.routing_id_, event->header_.len_,
+ event->header_.len_, event->buf_));
+#else
+ ACE_DEBUG ((LM_DEBUG, "(%t) route id = %d, cur len = %d, total bytes read = %d\n",
+ event->header_.routing_id_, event->header_.len_, this->total_bytes ()));
+#endif
+ return m + n;
+ }
+}
+
+// Receive various types of input (e.g., Peer event from the
+// gatewayd, as well as stdio).
+
+int
+Supplier_Handler::handle_input (ACE_HANDLE)
+{
+ ACE_Message_Block *forward_addr = 0;
+
+ switch (this->recv (forward_addr))
+ {
+ case 0:
+ // Note that a peer should never initiate a shutdown.
+ this->state (IO_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has closed down unexpectedly for Input IO_Handler %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // A short-read, we'll come back and finish it up later on!
+ return 0;
+ else // A weird problem occurred, shut down and start again.
+ {
+ this->state (IO_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input IO_Handler %d\n",
+ "Peer has failed unexpectedly",
+ this->id ()), -1);
+ }
+ /* NOTREACHED */
+ default:
+ return this->forward (forward_addr);
+ }
+}
+
+// Route an event to its appropriate destination.
+
+int
+Supplier_Handler::forward (ACE_Message_Block *forward_addr)
+{
+ // We got a valid event, so determine its virtual routing id,
+ // which is stored in the first of the two event blocks chained
+ // together.
+
+ Event_Addr *forwarding_key = (Event_Addr *) forward_addr->rd_ptr ();
+
+ // Skip over the address portion.
+ const ACE_Message_Block *const data = forward_addr->cont ();
+
+ // RE points to the routing entry located for this routing id.
+ Consumer_Entry *re = 0;
+
+ if (this->consumer_map_->find (*forwarding_key, re) != -1)
+ {
+ // Check to see if there are any destinations.
+ if (re->destinations ()->size () == 0)
+ ACE_DEBUG ((LM_WARNING,
+ "there are no active destinations for this event currently\n"));
+
+ else // There are destinations, so forward the event.
+ {
+ Consumer_Entry::ENTRY_SET *esp = re->destinations ();
+ Consumer_Entry::ENTRY_ITERATOR si (*esp);
+
+ for (IO_Handler **channel = 0; si.next (channel) != 0; si.advance ())
+ {
+ // Only process active channels.
+ if ((*channel)->active ())
+ {
+ // Clone the event portion (should be doing reference counting here...)
+ ACE_Message_Block *newmsg = data->clone ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", (*channel)->id ()));
+
+ if ((*channel)->put (newmsg) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ "gateway is flow controlled, so we're dropping events"));
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to route %d\n",
+ "put", (*channel)->id ()));
+
+ // Caller is responsible for freeing a ACE_Message_Block if failures occur.
+ delete newmsg;
+ }
+ }
+ }
+ // Will become superfluous once we have reference counting...
+ delete forward_addr;
+ return 0;
+ }
+ }
+ delete forward_addr;
+ // Failure return.
+ ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
+ forwarding_key->conn_id_, forwarding_key->logical_id_, forwarding_key->payload_));
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Map_Manager<Event_Addr, Consumer_Entry *, MAP_MUTEX>;
+template class ACE_Map_Iterator<Event_Addr, Consumer_Entry *, MAP_MUTEX>;
+template class ACE_Map_Entry<Event_Addr, Consumer_Entry *>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/IO_Handler.h b/apps/Gateway/Gateway/IO_Handler.h
new file mode 100644
index 00000000000..c22f1a3df26
--- /dev/null
+++ b/apps/Gateway/Gateway/IO_Handler.h
@@ -0,0 +1,224 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// IO_Handler.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_IO_HANDLER)
+#define _IO_HANDLER
+
+#include "ace/Service_Config.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Svc_Handler.h"
+#include "Consumer_Map.h"
+#include "Consumer_Entry.h"
+#include "Event.h"
+
+// Forward declaration.
+class IO_Handler_Connector;
+
+class IO_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
+ // = TITLE
+ // IO_Handler contains info about connection state and addressing.
+ //
+ // = DESCRIPTION
+ // The IO_Handler classes process events sent from the peers to the
+ // gateway. These classes works as follows:
+ //
+ // 1. IO_Handler_Connector creates a number of connections with the set of
+ // peers specified in a configuration file.
+ //
+ // 2. For each peer that connects successfully, IO_Handler_Connector
+ // creates an IO_Handler object. Each object assigns a unique routing
+ // id to its associated peer. The Handlers are used by gatewayd
+ // that to receive, route, and forward events from source peer(s)
+ // to destination peer(s).
+{
+public:
+ IO_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+
+ virtual int open (void * = 0);
+ // Initialize and activate a single-threaded IO_Handler (called by
+ // ACE_Connector::handle_output()).
+
+ int bind (const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ CONN_ID);
+ // Set the peer's addressing and routing information.
+
+ ACE_INET_Addr &remote_addr (void);
+ // Returns the peer's routing address.
+
+ ACE_INET_Addr &local_addr (void);
+ // Returns our local address.
+
+ // = Set/get routing id.
+ CONN_ID id (void);
+ void id (CONN_ID);
+
+ // = Set/get the current state of the IO_Handler.
+ enum State
+ {
+ IDLE = 1, // Prior to initialization.
+ CONNECTING, // During connection establishment.
+ ESTABLISHED, // IO_Handler is established and active.
+ DISCONNECTING, // IO_Handler is in the process of connecting.
+ FAILED // IO_Handler has failed.
+ };
+
+ // = Set/get the current state.
+ State state (void);
+ void state (State);
+
+ // = Set/get the current retry timeout delay.
+ int timeout (void);
+ void timeout (int);
+
+ // = Set/get the maximum retry timeout delay.
+ int max_timeout (void);
+ void max_timeout (int);
+
+ // = Set/get IO_Handler activity status.
+ int active (void);
+ void active (int);
+
+ // = Set/get direction (necessary for error checking).
+ char direction (void);
+ void direction (char);
+
+ // = The total number of bytes sent/received on this channel.
+ size_t total_bytes (void);
+ void total_bytes (size_t bytes);
+ // Increment count by <bytes>.
+
+ virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
+ // Perform timer-based IO_Handler reconnection.
+
+protected:
+ enum
+ {
+ MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
+ };
+
+ int initialize_connection (void);
+ // Perform the first-time initiation of a connection to the peer.
+
+ int reinitiate_connection (void);
+ // Reinitiate a connection asynchronously when peers fail.
+
+ void socket_queue_size (void);
+ // Set the socket queue size.
+
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ // Perform IO_Handler termination.
+
+ Consumer_Map *consumer_map_;
+ // Pointer to table that maps an event
+ // to a Set of IO_Handler *'s for output.
+
+ ACE_INET_Addr remote_addr_;
+ // Address of peer.
+
+ ACE_INET_Addr local_addr_;
+ // Address of us.
+
+ CONN_ID id_;
+ // The assigned routing ID of this entry.
+
+ size_t total_bytes_;
+ // The total number of bytes sent/received on this channel.
+
+ State state_;
+ // The current state of the channel.
+
+ IO_Handler_Connector *connector_;
+ // Back pointer to IO_Handler_Connector to reestablish broken
+ // connections.
+
+ int timeout_;
+ // Amount of time to wait between reconnection attempts.
+
+ int max_timeout_;
+ // Maximum amount of time to wait between reconnection attempts.
+
+ char direction_;
+ // Indicates which direction data flows through the channel ('O' ==
+ // output and 'I' == input).
+
+ int socket_queue_size_;
+ // Size of the socket queue (0 means "use default").
+};
+
+class Supplier_Handler : public IO_Handler
+ // = TITLE
+ // Handle reception of Peer events arriving as events.
+{
+public:
+ Supplier_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+ // Constructor sets the consumer map pointer.
+
+ virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
+ // Receive and process peer events.
+
+protected:
+ virtual int recv (ACE_Message_Block *&);
+ // Receive an event from a Supplier.
+
+ int forward (ACE_Message_Block *event);
+ // Forward the Event to a Consumer.
+
+ ACE_Message_Block *msg_frag_;
+ // Keep track of event fragment to handle non-blocking recv's from
+ // Suppliers.
+};
+
+class Consumer_Handler : public IO_Handler
+ // = TITLE
+ // Handle transmission of events to other Peers using a
+ // single-threaded approach.
+{
+public:
+ Consumer_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send an event to a Consumer (may be queued if necessary).
+
+protected:
+ // = We'll allow up to 16 megabytes to be queued per-output
+ // channel.
+ enum {QUEUE_SIZE = 1024 * 1024 * 16};
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive and process shutdowns from a Consumer.
+
+ virtual int handle_output (ACE_HANDLE);
+ // Finish sending event when flow control conditions abate.
+
+ int nonblk_put (ACE_Message_Block *mb);
+ // Perform a non-blocking put().
+
+ virtual int send (ACE_Message_Block *);
+ // Send an event to a Consumer.
+};
+
+#endif /* _IO_HANDLER */
diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.cpp b/apps/Gateway/Gateway/IO_Handler_Connector.cpp
new file mode 100644
index 00000000000..712b348951d
--- /dev/null
+++ b/apps/Gateway/Gateway/IO_Handler_Connector.cpp
@@ -0,0 +1,92 @@
+#include "IO_Handler_Connector.h"
+// $Id$
+
+
+IO_Handler_Connector::IO_Handler_Connector (void)
+{
+}
+
+// Override the connection-failure method to add timer support.
+// Note that these timers perform "expoential backoff" to
+// avoid rapidly trying to reestablish connections when a link
+// goes down.
+
+int
+IO_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
+{
+ ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0;
+
+ // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
+ if (this->handler_map_.find (sd, stp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
+ sd, "find"), -1);
+
+ IO_Handler *channel = stp->svc_handler ();
+
+ // Schedule a reconnection request at some point in the future
+ // (note that channel uses an exponential backoff scheme).
+ if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
+ channel->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ return 0;
+}
+
+// Initiate (or reinitiate) a connection to the IO_Handler.
+
+int
+IO_Handler_Connector::initiate_connection (IO_Handler *channel,
+ ACE_Synch_Options &synch_options)
+{
+ char buf[MAXHOSTNAMELEN];
+
+ // Mark ourselves as idle so that the various iterators
+ // will ignore us until we are reconnected.
+ channel->state (IO_Handler::IDLE);
+
+ if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
+ || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "can't obtain peer's address"), -1);
+
+ // Try to connect to the Peer.
+
+ if (this->connect (channel, channel->remote_addr (),
+ synch_options, channel->local_addr ()) == -1)
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ channel->state (IO_Handler::FAILED);
+ ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
+ "connect", buf));
+
+ // Reschedule ourselves to try and connect again.
+ if (synch_options[ACE_Synch_Options::USE_REACTOR])
+ {
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (channel, 0, channel->timeout ()) == 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ else
+ // Failures on synchronous connects are reported as errors
+ // so that the caller can decide how to proceed.
+ return -1;
+ }
+ else
+ {
+ channel->state (IO_Handler::CONNECTING);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in the process of connecting %s to %s\n",
+ synch_options[ACE_Synch_Options::USE_REACTOR]
+ ? "asynchronously" : "synchronously", buf));
+ }
+ }
+ else
+ {
+ channel->state (IO_Handler::ESTABLISHED);
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
+ buf, channel->get_handle ()));
+ }
+ return 0;
+}
diff --git a/apps/Gateway/Gateway/IO_Handler_Connector.h b/apps/Gateway/Gateway/IO_Handler_Connector.h
new file mode 100644
index 00000000000..585428c88ee
--- /dev/null
+++ b/apps/Gateway/Gateway/IO_Handler_Connector.h
@@ -0,0 +1,40 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// IO_Handler_Connector.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_IO_HANDLER_CONNECTOR)
+#define _IO_HANDLER_CONNECTOR
+
+#include "ace/Connector.h"
+#include "Thr_IO_Handler.h"
+
+class IO_Handler_Connector : public ACE_Connector<IO_Handler, ACE_SOCK_CONNECTOR>
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new IO_Handler object to do the dirty work...
+{
+public:
+ IO_Handler_Connector (void);
+
+ // Initiate (or reinitiate) a connection on the IO_Handler.
+ int initiate_connection (IO_Handler *,
+ ACE_Synch_Options & = ACE_Synch_Options::synch);
+
+protected:
+ // Override the connection-failure method to add timer support.
+ virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
+};
+
+#endif /* _IO_HANDLER_CONNECTOR */
diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile
index b2115abbd59..3b54556c4f6 100644
--- a/apps/Gateway/Gateway/Makefile
+++ b/apps/Gateway/Gateway/Makefile
@@ -9,17 +9,18 @@
#----------------------------------------------------------------------------
BIN = gatewayd
-LIB = libGateway.a
+#LIB = libGateway.a
SHLIB = libGateway.so
-FILES = Channel \
- Channel_Connector \
+FILES = Event_Channel \
+ IO_Handler \
+ IO_Handler_Connector \
Config_Files \
File_Parser \
Gateway \
- Routing_Entry \
- Routing_Table \
- Thr_Channel
+ Consumer_Entry \
+ Consumer_Map \
+ Thr_IO_Handler
LSRC = $(addsuffix .cpp,$(FILES))
LOBJ = $(addsuffix .o,$(FILES))
@@ -51,7 +52,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
# Default behavior is to use single-threading. See the README
# file for information on how to configure this with multiple
# strategies for threading the input and output channels.
-DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
+DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
#----------------------------------------------------------------------------
# Dependencies
@@ -60,8 +61,8 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
# DO NOT DELETE THIS LINE -- g++dep uses it.
# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-.obj/Channel.o .shobj/Channel.so: Channel.cpp Routing_Entry.h \
- $(WRAPPER_ROOT)/ace/Set.h \
+.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -73,7 +74,7 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- Channel_Connector.h \
+ Config_Files.h File_Parser.h IO_Handler_Connector.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
@@ -87,19 +88,80 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.h \
+ $(WRAPPER_ROOT)/ace/SOCK.h \
+ $(WRAPPER_ROOT)/ace/Addr.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.h \
+ $(WRAPPER_ROOT)/ace/IPC_SAP.i \
+ $(WRAPPER_ROOT)/ace/SOCK.i \
+ $(WRAPPER_ROOT)/ace/SOCK_IO.i \
+ $(WRAPPER_ROOT)/ace/INET_Addr.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Proactor.h \
$(WRAPPER_ROOT)/ace/Message_Block.h \
$(WRAPPER_ROOT)/ace/Malloc.h \
$(WRAPPER_ROOT)/ace/Malloc_T.h \
$(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
$(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Task_T.h \
+ $(WRAPPER_ROOT)/ace/Connector.i \
+ Thr_IO_Handler.h IO_Handler.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
+ Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h \
+ Event_Channel.h
+.obj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp Consumer_Entry.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/stdcpp.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ IO_Handler_Connector.h \
+ $(WRAPPER_ROOT)/ace/Connector.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ $(WRAPPER_ROOT)/ace/Thread_Manager.h \
+ $(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -113,21 +175,30 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Channel.h Channel.h \
+ Thr_IO_Handler.h IO_Handler.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Routing_Table.h Peer_Message.h
-.obj/Channel_Connector.o .shobj/Channel_Connector.so: Channel_Connector.cpp Channel_Connector.h \
+ Consumer_Map.h Concurrency_Strategies.h Event.h
+.obj/IO_Handler_Connector.o .shobj/IO_Handler_Connector.so: IO_Handler_Connector.cpp \
+ IO_Handler_Connector.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
@@ -152,20 +223,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -179,20 +242,28 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Channel.h Channel.h \
+ Thr_IO_Handler.h IO_Handler.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Routing_Table.h Routing_Entry.h Peer_Message.h
+ Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h
.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -220,7 +291,9 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Record.i \
File_Parser.h
.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \
- $(WRAPPER_ROOT)/ace/Get_Opt.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -232,9 +305,6 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
$(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Thread.h \
@@ -244,20 +314,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -271,23 +333,20 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
- $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- Config_Files.h File_Parser.h Gateway.h Channel_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/Svc_Handler.h \
- $(WRAPPER_ROOT)/ace/Synch_Options.h \
- $(WRAPPER_ROOT)/ace/Task.h \
- $(WRAPPER_ROOT)/ace/Task_T.h \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
$(WRAPPER_ROOT)/ace/Message_Queue.h \
$(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
$(WRAPPER_ROOT)/ace/Strategies.h \
- $(WRAPPER_ROOT)/ace/Connector.i \
- Thr_Channel.h Channel.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
- $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Routing_Table.h Routing_Entry.h Peer_Message.h
-.obj/Routing_Entry.o .shobj/Routing_Entry.so: Routing_Entry.cpp Routing_Entry.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
+ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ Gateway.h
+.obj/Consumer_Entry.o .shobj/Consumer_Entry.so: Consumer_Entry.cpp Consumer_Entry.h \
$(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -300,7 +359,7 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i
-.obj/Routing_Table.o .shobj/Routing_Table.so: Routing_Table.cpp Routing_Table.h \
+.obj/Consumer_Map.o .shobj/Consumer_Map.so: Consumer_Map.cpp Consumer_Map.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -312,8 +371,10 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Record.h \
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i
-.obj/Thr_Channel.o .shobj/Thr_Channel.so: Thr_Channel.cpp Thr_Channel.h Channel.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ Concurrency_Strategies.h Event.h Consumer_Entry.h \
+ $(WRAPPER_ROOT)/ace/Set.h
+.obj/Thr_IO_Handler.o .shobj/Thr_IO_Handler.so: Thr_IO_Handler.cpp Thr_IO_Handler.h IO_Handler.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -337,20 +398,12 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -364,6 +417,17 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
@@ -371,13 +435,11 @@ DEFFLAGS += -DASSIGN_ROUTING_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- Routing_Table.h \
+ Consumer_Map.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
- Routing_Entry.h Peer_Message.h Channel_Connector.h \
+ Concurrency_Strategies.h Event.h Consumer_Entry.h \
+ IO_Handler_Connector.h \
$(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Connector.i
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README
index ceb17528d0d..4e986354aaa 100644
--- a/apps/Gateway/Gateway/README
+++ b/apps/Gateway/Gateway/README
@@ -1,22 +1,23 @@
-This application illustrates an application-level Gateway which
-routes messages between a set of Peers in a distributed environment.
+This application illustrates an application-level Gateway which routes
+messages between Consumer and Suppliers in a distributed environment.
-The default configuration is single-threaded, i.e., all Input_Channels
-and Output_Channels are multiplexed via the Reactor on a single thread
-of control. To obtain a version that multi-threads both input and
-output simply set the following flag in the Makefile:
+The default configuration is single-threaded, i.e., all
+Supplier_Handlers and Consumer_Handlers are multiplexed via the ACE
+Reactor within a single thread of control. To obtain a version that
+multi-threads both Consumer_Handlers and Supplier_Handlers simply set
+the following flag in the Makefile:
DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT
-To get a version that uses single-threading for all Input_Channels,
-but a separate thread per-Output_Channel set the following flag in the
-Makefile:
+To get a version that uses single-threading for all Supplier_Handlers,
+but a separate thread per-Consumer_Handler set the following flag in
+the Makefile:
DEFFLAGS += -DUSE_OUTPUT_MT
If you examine the source code, you'll see that very few changes are
required in the source code to switch between single-threading and
multi-threading. The ACE Task class is primarily responsible for
-enabling the flexible modification of concurrency strategies with
-little modification to the source code, design, and system
+enabling the flexible modification of concurrency strategies with only
+minor changes required to the source code, design, and system
architecture.
diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.cpp b/apps/Gateway/Gateway/Thr_IO_Handler.cpp
new file mode 100644
index 00000000000..109cfad9c3f
--- /dev/null
+++ b/apps/Gateway/Gateway/Thr_IO_Handler.cpp
@@ -0,0 +1,204 @@
+#include "Thr_IO_Handler.h"
+// $Id$
+
+#include "IO_Handler_Connector.h"
+
+#if defined (ACE_HAS_THREADS)
+Thr_Consumer_Handler::Thr_Consumer_Handler (Consumer_Map *consumer_map,
+ IO_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : Consumer_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
+{
+}
+
+// This method should be called only when the peer shuts down
+// unexpectedly. This method marks the IO_Handler as having failed and
+// deactivates the ACE_Message_Queue (to wake up the thread blocked on
+// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
+// eventually try to reconnect...
+
+int
+Thr_Consumer_Handler::handle_input (ACE_HANDLE h)
+{
+ this->Consumer_Handler::handle_input (h);
+ ACE_Service_Config::reactor ()->remove_handler (h,
+ ACE_Event_Handler::RWE_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+ return 0;
+}
+
+// Initialize the threaded Consumer_Handler object and spawn a new
+// thread.
+
+int
+Thr_Consumer_Handler::open (void *)
+{
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Register ourselves to receive input events (which indicate that
+ // the Peer has shut down unexpectedly).
+ if (ACE_Service_Config::reactor ()->register_handler (this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+
+ if (this->initialize_connection ())
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "initialize_connection"), -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // messages to peers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// ACE_Queue up a message for transmission (must not block since all
+// Supplier_Handlers are single-threaded).
+
+int
+Thr_Consumer_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ // Perform non-blocking enqueue.
+ return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Transmit messages to the peer (note simplification resulting from
+// threads...)
+
+int
+Thr_Consumer_Handler::svc (void)
+{
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Handler's fd = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread it is OK to block on
+ // output.
+
+ for (ACE_Message_Block *mb = 0;
+ this->msg_queue ()->dequeue_head (mb) != -1; )
+ if (this->send (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
+
+ ACE_ASSERT (errno == ESHUTDOWN);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Handler %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ this->peer ().close ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->connector_->initiate_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+ ACE_OS::sleep (tv);
+ }
+ }
+
+ return 0;
+}
+
+Thr_Supplier_Handler::Thr_Supplier_Handler (Consumer_Map *consumer_map,
+ IO_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : Supplier_Handler (consumer_map, ioc, thr_mgr, socket_queue_size)
+{
+}
+
+int
+Thr_Supplier_Handler::open (void *)
+{
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ if (this->initialize_connection ())
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "initialize_connection"), -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // messages to peers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// Receive messages from a Peer in a separate thread (note reuse of
+// existing code!).
+
+int
+Thr_Supplier_Handler::svc (void)
+{
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Handler's fd = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread and processes
+ // messages for one connection it is OK to block on input and
+ // output.
+
+ while (this->handle_input () != -1)
+ continue;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Supplier_Handler %d on handle %d\n",
+ this->id (),
+ this->get_handle ()));
+
+ this->peer ().close ();
+
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->connector_->initiate_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n", tv.sec ()));
+ ACE_OS::sleep (tv);
+ }
+ }
+ return 0;
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_IO_Handler.h b/apps/Gateway/Gateway/Thr_IO_Handler.h
new file mode 100644
index 00000000000..ee056b35361
--- /dev/null
+++ b/apps/Gateway/Gateway/Thr_IO_Handler.h
@@ -0,0 +1,64 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Thr_IO_Handler.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_THR_IO_HANDLER)
+#define _THR_IO_HANDLER
+
+#include "IO_Handler.h"
+
+#if defined (ACE_HAS_THREADS)
+class Thr_Consumer_Handler : public Consumer_Handler
+ // = TITLE
+ // Runs each Output IO_Handler in a separate thread.
+{
+public:
+ Thr_Consumer_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager *,
+ int socket_queue_size);
+
+ virtual int open (void *);
+ // Initialize the threaded Consumer_Handler object and spawn a new
+ // thread.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Called when Peer shutdown unexpectedly.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send a message to a peer.
+
+ virtual int svc (void);
+ // Transmit peer messages.
+};
+
+class Thr_Supplier_Handler : public Supplier_Handler
+ // = TITLE
+ // Runs each Input IO_Handler in a separate thread.
+{
+public:
+ Thr_Supplier_Handler (Consumer_Map *,
+ IO_Handler_Connector *,
+ ACE_Thread_Manager *,
+ int socket_queue_size);
+
+ virtual int open (void *);
+ // Initialize the object and spawn a new thread.
+
+ virtual int svc (void);
+ // Transmit peer messages.
+};
+#endif /* ACE_HAS_THREADS */
+#endif /* _THR_IO_HANDLER */
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config
new file mode 100644
index 00000000000..d33469ee157
--- /dev/null
+++ b/apps/Gateway/Gateway/consumer_config
@@ -0,0 +1,8 @@
+# Consumer map configuration file
+# Conn ID Logical ID Payload Destinations
+# ------- ---------- ------- ------------
+# 1 1 0 3,4,5
+ 1 1 0 3
+ 3 1 0 3
+# 4 1 0 4
+# 5 1 0 5
diff --git a/apps/Gateway/Gateway/svc.conf b/apps/Gateway/Gateway/svc.conf
index 7ae8d4b0080..6c41415a9ce 100644
--- a/apps/Gateway/Gateway/svc.conf
+++ b/apps/Gateway/Gateway/svc.conf
@@ -1,3 +1,3 @@
#static Svc_Manager "-d -p 2913"
-dynamic Gateway Service_Object * ./libGateway:_make_ACE_Gateway() active "-d -c cc_config -f rt_config"
+dynamic Gateway Service_Object * ./libGateway:_make_ACE_Gateway() active "-d -c connection_config -f consumer_config"
diff --git a/apps/Gateway/Peer/Gateway_Handler.cpp b/apps/Gateway/Peer/Gateway_Handler.cpp
index 15ca0a58807..cfc9a7dad6f 100644
--- a/apps/Gateway/Peer/Gateway_Handler.cpp
+++ b/apps/Gateway/Peer/Gateway_Handler.cpp
@@ -84,10 +84,10 @@ Gateway_Handler::xmit_stdin (void)
ACE_Message_Block *mb;
ACE_NEW_RETURN (mb,
- ACE_Message_Block (sizeof (Peer_Message)),
+ ACE_Message_Block (sizeof (Event)),
-1);
- Peer_Message *peer_msg = (Peer_Message *) mb->rd_ptr ();
+ Event *peer_msg = (Event *) mb->rd_ptr ();
peer_msg->header_.routing_id_ = this->routing_id_;
n = ACE_OS::read (ACE_STDIN, peer_msg->buf_, sizeof peer_msg->buf_);
@@ -270,7 +270,7 @@ Gateway_Handler::send_peer (ACE_Message_Block *mb)
int
Gateway_Handler::recv_peer (ACE_Message_Block *&mb)
{
- Peer_Message *peer_msg;
+ Event *peer_msg;
size_t len;
ssize_t n;
size_t offset = 0;
@@ -278,14 +278,14 @@ Gateway_Handler::recv_peer (ACE_Message_Block *&mb)
if (this->msg_frag_ == 0)
{
ACE_NEW_RETURN (this->msg_frag_,
- ACE_Message_Block (sizeof (Peer_Message)),
+ ACE_Message_Block (sizeof (Event)),
-1);
// No existing fragment...
if (this->msg_frag_ == 0)
ACE_ERROR_RETURN ((LM_ERROR, "out of memory\n"), -1);
- peer_msg = (Peer_Message *) this->msg_frag_->rd_ptr ();
+ peer_msg = (Event *) this->msg_frag_->rd_ptr ();
switch (n = this->peer ().recv (peer_msg, sizeof (Peer_Header)))
{
@@ -441,7 +441,7 @@ Gateway_Handler::await_messages (void)
// We got a valid message, so let's process it now! At the
// moment, we just print out the message contents...
- Peer_Message *peer_msg = (Peer_Message *) mb->rd_ptr ();
+ Event *peer_msg = (Event *) mb->rd_ptr ();
this->total_bytes_ += mb->length ();
#if defined (VERBOSE)
diff --git a/apps/Gateway/Peer/Gateway_Handler.h b/apps/Gateway/Peer/Gateway_Handler.h
index 6dc4539e6b7..82477264c4f 100644
--- a/apps/Gateway/Peer/Gateway_Handler.h
+++ b/apps/Gateway/Peer/Gateway_Handler.h
@@ -32,7 +32,7 @@
#include "ace/SOCK_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/Map_Manager.h"
-#include "Peer_Message.h"
+#include "Event.h"
// Forward declaration.
class Gateway_Handler;
diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile
index f88aaf14926..9909eb2ef2a 100644
--- a/apps/Gateway/Peer/Makefile
+++ b/apps/Gateway/Peer/Makefile
@@ -14,9 +14,9 @@ FILES = Gateway_Handler
LSRC = $(addsuffix .cpp,$(FILES))
LOBJ = $(addsuffix .o,$(FILES))
-VSHOBJS = $(LSRC:%.cpp=$(VSHDIR)%.so)
+SHOBJ = $(addsuffix .so,$(FILES))
-LDLIBS = $(VSHOBJS)
+LDLIBS = $(addprefix .shobj/,$(SHOBJ))
VLDLIBS = $(LDLIBS:%=%$(VAR))
@@ -111,6 +111,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
- Peer_Message.h
+ Event.h
# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/apps/Gateway/README b/apps/Gateway/README
index 892fe43ba6d..ffd7e52bdf4 100644
--- a/apps/Gateway/README
+++ b/apps/Gateway/README
@@ -2,8 +2,12 @@ OVERVIEW
This directory contains source code for a prototype application-level
gateway implemented with ACE. This prototype was developed in my
-cs422 grad OS class at Washington University. You can get a paper
-that explains the patterns used in this implementation at URL
+cs422 OS class at Washington University. It illustrates the use of
+Event Channels to forward events from Suppliers to Consumers in a
+distributed system.
+
+You can get a paper that explains the patterns used in this
+implementation at the following WWW URL:
http://www.cs.wustl.edu/~schmidt/TAPOS-95.ps.gz
@@ -15,28 +19,33 @@ Gateway
-- The application Gateway, which must be started *after* all
the Peers described below). This process reads the
- cc_config and rt_config files. The cc_config file tells
- the Gateway what connections to establish with which hosts
- on which ports, etc. The rt_config file tells the Gateway
- how to route data coming from "sources" to the appropriate
- "destinations."
-
+ connection_config and consumer_config files:
+
+ 1. The connection_config file is used to establish the "physical
+ configuration." It tells the Gateway what connections
+ to establish with particular hosts using particular
+ ports.
+
+ 2. The consumer_config file is used to establish the "logical
+ configuration." It tells the Gateway how to forward
+ data coming from "sources" to the appropriate
+ "destinations."
Peer
-- The test driver programs that must be started *before* the
- Gateway. To do anything interesting you'll need at
- least two Peers: one for supplying events and one for consuming
- them. In the configuration files, these two types of Peers
- are designated as follows:
+ Gateway. To do anything interesting you'll need at least
+ two Peers: one to supply events and one to consume events.
+ In the configuration files, these two types of Peers are
+ designated as follows:
- (1) Input Peers (designated by an "I" in the Gateway's
- cc_config configuration file). These Peers are "sources"
- of messages to the Gateway.
+ 1. Supplier Peers (designated by an 'S' in the Gateway's
+ connection_config configuration file). These Peers are
+ "suppliers" of events to the Gateway.
- (2) Output Peers (designated by an "O" in the Gateway's
- cc_config file). These Peers are "destinations" of
- messages routed by the Gateway (routing is based on
- the settings in the rt_config configuration file).
+ 2. Consumer Peers (designated by an 'C' in the Gateway's
+ connection_config file). These Peers are "consumers" of
+ events forwarded by the Gateway (forwarding is based on
+ the settings in the consumer_config configuration file).
RUNNING THE TESTS
@@ -45,39 +54,39 @@ To run the tests do the following:
1. Compile everything (i.e., first compile the ACE libraries, then
compile the the Gateway directories).
-2. Edit the rt_config and cc_config files as discussed above.
+2. Edit the consumer_config and connection_config files as discussed
+ above to indicate the desired physical and logical mappings.
3. Start up the Peers (peerd). You can start up as many as you
- like, as per the cc_config file, but you'll need at least
- two (one for supplying input and one for consuming output). I
- typically start up each peer in a different window on a different
- machine. The peers should print out some diagnostic info and then
- block awaiting connections from the Gateway.
-
-4. Start up the Gateway (gatewayd). This will print out
- a bunch of messages as it reads the config files and connects
- to all the Peers. Assuming everything works, then all the
- Peers will be connected. If some of the Peers aren't set up
- correctly then the Gateway will use an exponential backoff
- algorithm to attempt to reestablish those connections.
+ like, as per the connection_config file, but you'll need at least
+ two (one to supply and one to consume). I typically start up each
+ Peer in a different window on a different machine. The Peers
+ should print out some diagnostic info and then block awaiting
+ connections from the Gateway.
+
+4. Start up the Gateway (gatewayd). This will print out a bunch of
+ events as it reads the config files and connects to all the Peers.
+ Assuming everything works, then all the Peers will be connected.
+ If some of the Peers aren't set up correctly then the Gateway will
+ use an exponential backoff algorithm to attempt to reestablish
+ those connections.
5. Once the Gateway has connected with all the Peers you can send
- messages from Input Peers by typing commands in the Peer window.
- This input will be sent to the Gateway, which will forward
- the message to all Output Peers that have "subscribed" to receive
- these messages.
+ events from Supplier Peers by typing commands in the Peer window.
+ This Supplier will be sent to the Gateway, which will forward the
+ event to all Consumer Peers that have "subscribed" to receive these
+ events.
Note that if you type ^C in a Peer window the Peer will shutdown
- its handlers and exit. The Gateway will detect this and will
- start trying to reestablish the connection using the same
- exponential backoff algorithm it used for the initial connection
- establishment.
+ its handlers and exit. The Gateway will detect this and will start
+ trying to reestablish the connection using the same exponential
+ backoff algorithm it used for the initial connection establishment.
-7. When you want to terminate a Gateway, just type ^C
- and the process will shut down gracefully.
+7. When you want to terminate a Gateway, just type ^C and the process
+ will shut down gracefully.
Please let me know if there are any questions.
Doug
-schmidt@cs.wustl.edu
+ schmidt@cs.wustl.edu
diff --git a/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h b/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h
index 293d597a99f..43730693c93 100644
--- a/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h
+++ b/apps/Orbix-Examples/Event_Comm/Supplier/Notifier_Handler.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
diff --git a/examples/ASX/CCM_App/CCM_App.cpp b/examples/ASX/CCM_App/CCM_App.cpp
index 93335efcc9c..e0330cf53de 100644
--- a/examples/ASX/CCM_App/CCM_App.cpp
+++ b/examples/ASX/CCM_App/CCM_App.cpp
@@ -14,9 +14,6 @@ class ACE_Svc_Export Test_Task : public MT_Task
public:
virtual int open (void *);
virtual int close (u_long);
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
- virtual int svc (void);
- virtual int info (char **, size_t) const;
virtual int init (int, char *[]);
virtual int fini (void);
virtual int suspend (void);
@@ -52,24 +49,6 @@ Test_Task::resume (void)
}
int
-Test_Task::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- return 0;
-}
-
-int
-Test_Task::svc (void)
-{
- return 0;
-}
-
-int
-Test_Task::info (char **, size_t) const
-{
- return 0;
-}
-
-int
Test_Task::init (int, char *[])
{
ACE_DEBUG ((LM_DEBUG, "initializing %s\n", this->name () ? this->name () : "task"));
diff --git a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
index 7451d38b2b7..498e91d476e 100644
--- a/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
+++ b/examples/ASX/Event_Server/Event_Server/Event_Analyzer.h
@@ -19,7 +19,6 @@ public:
virtual int open (void *a = 0);
virtual int close (u_long flags = 0);
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- virtual int svc (void) { return 0; }
/* Dynamic linking hooks */
virtual int init (int argc, char *argv[]);
diff --git a/examples/ASX/Message_Queue/buffer_stream.cpp b/examples/ASX/Message_Queue/buffer_stream.cpp
index fbfed5d13ce..cc2c475bef1 100644
--- a/examples/ASX/Message_Queue/buffer_stream.cpp
+++ b/examples/ASX/Message_Queue/buffer_stream.cpp
@@ -30,12 +30,6 @@ public:
// ACE_Task hooks
virtual int open (void * = 0);
virtual int close (u_long = 0);
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) { return 0; }
-
- // ACE_Service_Object hooks
- virtual int init (int, char **) { return 0; }
- virtual int fini (void) { return 0; }
- virtual int info (char **, size_t) const { return 0; }
};
// Define the Producer interface.
diff --git a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
index 3dd706eaa6a..a815e7d642a 100644
--- a/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
+++ b/examples/ASX/UPIPE_Event_Server/Event_Analyzer.h
@@ -19,7 +19,6 @@ public:
virtual int open (void *a = 0);
virtual int close (u_long flags = 0);
virtual int put (ACE_Message_Block *msg, ACE_Time_Value * = 0);
- virtual int svc (void) { return 0; }
// Dynamic linking hooks.
virtual int init (int argc, char *argv[]);
diff --git a/examples/Connection/non_blocking/CPP-connector.h b/examples/Connection/non_blocking/CPP-connector.h
index 3cf2a6eb2fb..81be8b6ba71 100644
--- a/examples/Connection/non_blocking/CPP-connector.h
+++ b/examples/Connection/non_blocking/CPP-connector.h
@@ -34,9 +34,6 @@ protected:
// Keeps track of which state we are in.
private:
- // = Disallow these methods...
- virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
- virtual int svc (void) { return 0; }
};
template <class SVC_HANDLER, ACE_PEER_CONNECTOR_1>
diff --git a/examples/Reactor/Misc/test_demuxing.cpp b/examples/Reactor/Misc/test_demuxing.cpp
index cacac69f5e6..f8f7992ce37 100644
--- a/examples/Reactor/Misc/test_demuxing.cpp
+++ b/examples/Reactor/Misc/test_demuxing.cpp
@@ -212,11 +212,6 @@ public:
// Run the "event-loop" periodically putting messages to our
// internal <Message_Queue> that we inherit from <ACE_Task>.
- // = Not used...
- virtual int open (void *) { return 0; }
- virtual int close (u_long) { return 0; }
- virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
-
private:
ACE_Reactor_Notification_Strategy notification_strategy_;
// This strategy will notify the <ACE_Reactor> Singleton when a new
diff --git a/examples/Reactor/Misc/test_reactors.cpp b/examples/Reactor/Misc/test_reactors.cpp
index dbb60941f24..5426af32d2a 100644
--- a/examples/Reactor/Misc/test_reactors.cpp
+++ b/examples/Reactor/Misc/test_reactors.cpp
@@ -1,6 +1,6 @@
-// Perform a torture test of multiple ACE_Reactors and ACE_Tasks in
// $Id$
+// Perform a torture test of multiple ACE_Reactors and ACE_Tasks in
// the same process... Thanks to Detlef Becker for contributing this.
#include "ace/Reactor.h"
@@ -21,7 +21,6 @@ public:
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
virtual int svc (void);
virtual int handle_input (ACE_HANDLE handle);
@@ -29,7 +28,6 @@ public:
ACE_Reactor_Mask close_mask);
private:
- ACE_Reactor *r_;
int handled_;
static int task_count_;
@@ -64,7 +62,7 @@ Test_Task::~Test_Task (void)
int
Test_Task::open (void *args)
{
- r_ = (ACE_Reactor *) args;
+ this->reactor ((ACE_Reactor *) args);
return this->activate (THR_NEW_LWP);
}
@@ -81,13 +79,6 @@ Test_Task::close (u_long)
}
int
-Test_Task::put (ACE_Message_Block *,
- ACE_Time_Value *)
-{
- return 0;
-}
-
-int
Test_Task::svc (void)
{
for (int i = 0; i < NUM_INVOCATIONS; i++)
@@ -96,7 +87,7 @@ Test_Task::svc (void)
// ACE_DEBUG ((LM_DEBUG, "(%t) calling notify %d\n", i));
- if (r_->notify (this, ACE_Event_Handler::READ_MASK) == -1)
+ if (this->reactor ()->notify (this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "notify"), -1);
// ACE_DEBUG ((LM_DEBUG, "(%t) leaving notify %d\n", i));
diff --git a/examples/Reactor/ReactorEx/test_reactorEx.cpp b/examples/Reactor/ReactorEx/test_reactorEx.cpp
index 334cfce80aa..488fa1e683a 100644
--- a/examples/Reactor/ReactorEx/test_reactorEx.cpp
+++ b/examples/Reactor/ReactorEx/test_reactorEx.cpp
@@ -76,9 +76,6 @@ public:
// Called when output events should start
private:
- int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
- // Make Task happy.
-
ACE_SOCK_Stream stream_;
// Socket that we have connected to the server.
diff --git a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
index 334cfce80aa..488fa1e683a 100644
--- a/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
+++ b/examples/Reactor/WFMO_Reactor/test_reactorEx.cpp
@@ -76,9 +76,6 @@ public:
// Called when output events should start
private:
- int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
- // Make Task happy.
-
ACE_SOCK_Stream stream_;
// Socket that we have connected to the server.
diff --git a/examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp b/examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp
new file mode 100644
index 00000000000..8af66b90f34
--- /dev/null
+++ b/examples/System_V_IPC/SV_Semaphores/Semaphores_1.cpp
@@ -0,0 +1,89 @@
+// $Id$
+
+#include "ace/SV_Shared_Memory.h"
+#include "ace/SV_Semaphore_Simple.h"
+#include "ace/Malloc.h"
+
+#if defined (ACE_HAS_SYSV_IPC)
+
+// Shared memory allocator (note that this chews up the
+// ACE_DEFAULT_SEM_KEY).
+static ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple> allocator;
+
+const int SEM_KEY = ACE_DEFAULT_SEM_KEY + 1;
+const int SHMSZ = 27;
+
+static int
+parent (char *shm)
+{
+ char *s = shm;
+
+ ACE_SV_Semaphore_Complex sem (SEM_KEY, ACE_SV_Semaphore_Complex::ACE_CREATE, 0, 2);
+
+ for (char c = 'a'; c <= 'z'; c++)
+ *s++ = c;
+
+ *s = '\0';
+
+ if (sem.release (0) == -1)
+ ACE_ERROR ((LM_ERROR, "%p", "parent sem.release(0)"));
+ else if (sem.acquire (1) == -1)
+ ACE_ERROR ((LM_ERROR, "%p", "parent sem.acquire(1)"));
+
+ if (allocator.remove () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "allocator.remove"));
+ if (sem.remove () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "sem.remove"));
+ return 0;
+}
+
+static int
+child (char *shm)
+{
+ ACE_SV_Semaphore_Complex sem (SEM_KEY, ACE_SV_Semaphore_Complex::ACE_CREATE, 0, 2);
+
+ while (sem.tryacquire (0) == -1)
+ if (errno == EAGAIN)
+ ACE_DEBUG ((LM_DEBUG, "spinning in client!\n"));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "client mutex.tryacquire(0)"), 1);
+
+ for (char *s = (char *) shm; *s != '\0'; s++)
+ ACE_DEBUG ((LM_DEBUG, "%c", *s));
+
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ if (sem.release (1) < 0)
+ ACE_ERROR ((LM_ERROR, "client sem.release(1)"));
+ return 0;
+}
+
+int
+main (void)
+{
+ char *shm = (char *) allocator.malloc (27);
+
+ switch (ACE_OS::fork ())
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR, "fork failed\n"), -1);
+ /* NOTREACHED */
+ case 0:
+ // Child.
+ return child (shm);
+ default:
+ return parent (shm);
+ }
+}
+#else
+int main (void)
+{
+ ACE_ERROR ((LM_ERROR,
+ "SYSV IPC is not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_SYSV_IPC */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp b/examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp
new file mode 100644
index 00000000000..0405440364c
--- /dev/null
+++ b/examples/System_V_IPC/SV_Semaphores/Semaphores_2.cpp
@@ -0,0 +1,100 @@
+// $Id$
+
+// Illustrates the use of the ACE_SV_Semaphore_Complex class and the
+// ACE_Malloc class using the ACE_Shared_Memory_Pool (which uses
+// System V shared memory). Note that it doesn't matter whether the
+// parent or the child creates the semaphore since Semaphore_Complex
+// will correctly serialize the intialization of the mutex and synch
+// objects.
+
+#include "ace/Malloc.h"
+#include "ace/SV_Semaphore_Complex.h"
+
+#if defined (ACE_HAS_SYSV_IPC)
+
+// Shared memory allocator (note that this chews up the
+// ACE_DEFAULT_SEM_KEY).
+static ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple> allocator;
+
+const int SEM_KEY_1 = ACE_DEFAULT_SEM_KEY + 1;
+const int SEM_KEY_2 = ACE_DEFAULT_SEM_KEY + 2;
+const int SHMSZ = 27;
+
+static int
+parent (char *shm)
+{
+ char *s = shm;
+
+ ACE_SV_Semaphore_Complex mutex (SEM_KEY_1, ACE_SV_Semaphore_Complex::ACE_CREATE, 0);
+ ACE_SV_Semaphore_Complex synch (SEM_KEY_2, ACE_SV_Semaphore_Complex::ACE_CREATE, 0);
+
+ for (char c = 'a'; c <= 'z'; c++)
+ *s++ = c;
+
+ *s = '\0';
+
+ if (mutex.release () == -1)
+ ACE_ERROR ((LM_ERROR, "%p", "parent mutex.release"));
+ else if (synch.acquire () == -1)
+ ACE_ERROR ((LM_ERROR, "%p", "parent synch.acquire"));
+
+ if (allocator.remove () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "allocator.remove"));
+ if (mutex.remove () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "mutex.remove"));
+ if (synch.remove () == -1)
+ ACE_ERROR ((LM_ERROR, "%p\n", "synch.remove"));
+ return 0;
+}
+
+static int
+child (char *shm)
+{
+ ACE_SV_Semaphore_Complex mutex (SEM_KEY_1, ACE_SV_Semaphore_Complex::ACE_CREATE, 0);
+ ACE_SV_Semaphore_Complex synch (SEM_KEY_2, ACE_SV_Semaphore_Complex::ACE_CREATE, 0);
+
+ while (mutex.tryacquire () == -1)
+ if (errno == EAGAIN)
+ ACE_DEBUG ((LM_DEBUG, "spinning in child!\n"));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "child mutex.tryacquire"), 1);
+
+ for (char *s = (char *) shm; *s != '\0'; s++)
+ ACE_DEBUG ((LM_DEBUG, "%c", *s));
+
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+
+ if (synch.release () == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "child synch.release"), 1);
+ return 0;
+}
+
+int
+main (void)
+{
+ char *shm = (char *) allocator.malloc (27);
+
+ switch (ACE_OS::fork ())
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR, "fork failed\n"), -1);
+ /* NOTREACHED */
+ case 0:
+ // Child.
+ return child (shm);
+ default:
+ return parent (shm);
+ }
+}
+#else
+int main (void)
+{
+ ACE_ERROR ((LM_ERROR,
+ "SYSV IPC is not supported on this platform\n"));
+ return 0;
+}
+#endif /* ACE_HAS_SYSV_IPC */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Malloc<ACE_SHARED_MEMORY_POOL, ACE_SV_Semaphore_Simple>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/examples/Threads/future1.cpp b/examples/Threads/future1.cpp
index ea295e487e1..00814fee881 100644
--- a/examples/Threads/future1.cpp
+++ b/examples/Threads/future1.cpp
@@ -57,7 +57,6 @@ public:
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
virtual int svc (void);
ACE_Future<double> work (double param, int count);
@@ -196,13 +195,6 @@ Scheduler::close (u_long)
return 0;
}
-// put... ??
-int
-Scheduler::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- return 0;
-}
-
// service..
int
Scheduler::svc (void)
diff --git a/examples/Threads/future2.cpp b/examples/Threads/future2.cpp
index 55ce8c05a40..f034d765c1e 100644
--- a/examples/Threads/future2.cpp
+++ b/examples/Threads/future2.cpp
@@ -73,10 +73,6 @@ private:
virtual int close (u_long flags = 0);
// Should not be accessible from outside... (use end () instead).
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0)
- { return 0; };
- // Doesn't have any use for this example.
-
virtual int svc (void);
// Here the actual servicing of all requests is happening..
diff --git a/examples/Threads/task_four.cpp b/examples/Threads/task_four.cpp
index 64209cb3430..87a2f3abfa8 100644
--- a/examples/Threads/task_four.cpp
+++ b/examples/Threads/task_four.cpp
@@ -55,11 +55,6 @@ private:
// Number of threads per task.
int n_iterations_;
// Number of iterations per thread.
-
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
- virtual int close (u_long) { return 0; }
- virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
};
class Worker_Task : public ACE_Task<ACE_MT_SYNCH>
diff --git a/examples/Threads/task_one.cpp b/examples/Threads/task_one.cpp
index d0a8a12e6c4..7acf8de3789 100644
--- a/examples/Threads/task_one.cpp
+++ b/examples/Threads/task_one.cpp
@@ -31,11 +31,6 @@ private:
int n_iterations_;
// Number of iterations to run.
-
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
- virtual int close (u_long) { return 0; }
- virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
};
Barrier_Task::Barrier_Task (ACE_Thread_Manager *thr_mgr,
diff --git a/examples/Threads/task_three.cpp b/examples/Threads/task_three.cpp
index 0214ac10ddf..4b365aba8a5 100644
--- a/examples/Threads/task_three.cpp
+++ b/examples/Threads/task_three.cpp
@@ -33,7 +33,6 @@ public:
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
virtual int svc (void);
virtual int handle_input (ACE_HANDLE fd);
@@ -86,12 +85,6 @@ Test_Task::close (u_long)
return 0;
}
-int
-Test_Task::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- return 0;
-}
-
Test_Task::svc (void)
{
// Every thread must register the same stream to write to file.
diff --git a/examples/Threads/task_two.cpp b/examples/Threads/task_two.cpp
index 1c6366c4b12..c6e88d8eaa5 100644
--- a/examples/Threads/task_two.cpp
+++ b/examples/Threads/task_two.cpp
@@ -31,7 +31,6 @@ class Task_Test : public ACE_Task<ACE_MT_SYNCH>
public:
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
virtual int svc (void);
private:
@@ -66,13 +65,6 @@ Task_Test::close (u_long)
}
int
-Task_Test::put (ACE_Message_Block *,
- ACE_Time_Value *)
-{
- return 0;
-}
-
-int
Task_Test::svc (void)
{
wait_count++;
diff --git a/examples/Threads/thread_pool.cpp b/examples/Threads/thread_pool.cpp
index 9478ed0883d..ddcad02e4dd 100644
--- a/examples/Threads/thread_pool.cpp
+++ b/examples/Threads/thread_pool.cpp
@@ -34,9 +34,6 @@ public:
private:
virtual int close (u_long);
-
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
};
int
diff --git a/examples/Threads/token.cpp b/examples/Threads/token.cpp
index 5a51496d011..e8c72be3d1d 100644
--- a/examples/Threads/token.cpp
+++ b/examples/Threads/token.cpp
@@ -10,9 +10,6 @@ class My_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
My_Task (int n);
- virtual int open (void *) { return 0; }
- virtual int close (u_long) { return 0; }
- virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
virtual int svc (void);
static void sleep_hook (void *);
diff --git a/examples/Threads/tss1.cpp b/examples/Threads/tss1.cpp
index 7efdc9dc3ef..dd21023fa19 100644
--- a/examples/Threads/tss1.cpp
+++ b/examples/Threads/tss1.cpp
@@ -96,9 +96,6 @@ public:
virtual int open (void *theArgs = 0);
virtual int close (u_long theArg = 0);
- virtual int put (ACE_Message_Block *theMsgBlock,
- ACE_Time_Value *theTimeVal = 0);
- virtual int svc (void);
};
template <ACE_SYNCH_1> int
@@ -119,18 +116,6 @@ int Tester<ACE_SYNCH_2>::close (u_long)
return 0;
}
-template <ACE_SYNCH_1> int
-Tester<ACE_SYNCH_2>::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- return 0;
-}
-
-template <ACE_SYNCH_1> int
-Tester<ACE_SYNCH_2>::svc (void)
-{
- return 0;
-}
-
int
main (int, char *[])
{
diff --git a/netsvcs/servers/svc.conf b/netsvcs/servers/svc.conf
index f8634065b13..b136e97bb33 100644
--- a/netsvcs/servers/svc.conf
+++ b/netsvcs/servers/svc.conf
@@ -1,5 +1,5 @@
# These are the services that can be linked into ACE.
-# Note that you can replace the hardcoded "../lib/libnet_svcs" with
+# Note that you can replace the hardcoded "../lib/net_svcs" with
# a relative path if you set your LD search path correctly -- ACE will
# locate this for you automatically by reading your LD search path.
# Moreover, ACE will automatically insert the correct suffix (e.g.,
@@ -7,11 +7,11 @@
# "-p 20xxx" with "-p $PORTxxx" if you set your environment variables
# correctly.
-dynamic Logger Service_Object * ../lib/libnet_svcs:_make_ACE_Logger() "-s foobar -f STDERR|OSTREAM"
-dynamic Time_Service Service_Object * ../lib/libnet_svcs:_make_ACE_TS_Server_Acceptor() "-p 10222"
-dynamic Name_Server Service_Object * ../lib/libnet_svcs:_make_ACE_Name_Acceptor() "-p 10012"
-dynamic Token_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Token_Acceptor() "-p 10202"
-dynamic Server_Logging_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Server_Logging_Acceptor() active "-p 10009"
-dynamic Thr_Server_Logging_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Thr_Server_Logging_Acceptor() active "-p 10020"
-dynamic Client_Logging_Service Service_Object * ../lib/libnet_svcs:_make_ACE_Client_Logging_Connector() active "-p 10009"
+dynamic Logger Service_Object * ../lib/net_svcs:_make_ACE_Logger() "-s foobar -f STDERR|OSTREAM"
+dynamic Time_Service Service_Object * ../lib/net_svcs:_make_ACE_TS_Server_Acceptor() "-p 10222"
+dynamic Name_Server Service_Object * ../lib/net_svcs:_make_ACE_Name_Acceptor() "-p 10012"
+dynamic Token_Service Service_Object * ../lib/net_svcs:_make_ACE_Token_Acceptor() "-p 10202"
+dynamic Server_Logging_Service Service_Object * ../lib/net_svcs:_make_ACE_Server_Logging_Acceptor() active "-p 10009"
+dynamic Thr_Server_Logging_Service Service_Object * ../lib/net_svcs:_make_ACE_Thr_Server_Logging_Acceptor() active "-p 10020"
+dynamic Client_Logging_Service Service_Object * ../lib/net_svcs:_make_ACE_Client_Logging_Connector() active "-p 10009"
diff --git a/tests/Buffer_Stream_Test.cpp b/tests/Buffer_Stream_Test.cpp
index a228d94e783..aa5ec27414c 100644
--- a/tests/Buffer_Stream_Test.cpp
+++ b/tests/Buffer_Stream_Test.cpp
@@ -46,12 +46,6 @@ public:
// ACE_Task hooks
virtual int open (void * = 0);
virtual int close (u_long = 0);
- virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0) { return 0; }
-
- // ACE_Service_Object hooks
- virtual int init (int, char **) { return 0; }
- virtual int fini (void) { return 0; }
- virtual int info (char **, size_t) const { return 0; }
};
// Define the Producer interface.
diff --git a/tests/Future_Test.cpp b/tests/Future_Test.cpp
index 00ed154ebbf..8207ede913c 100644
--- a/tests/Future_Test.cpp
+++ b/tests/Future_Test.cpp
@@ -58,7 +58,6 @@ public:
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
virtual int svc (void);
ACE_Future<double> work (double param, int count);
@@ -196,13 +195,6 @@ Scheduler::close (u_long)
return 0;
}
-// put... ??
-int
-Scheduler::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- return 0;
-}
-
// service..
int
Scheduler::svc (void)
diff --git a/tests/Priority_Task_Test.cpp b/tests/Priority_Task_Test.cpp
index 5444e5b987c..bcc52856f43 100644
--- a/tests/Priority_Task_Test.cpp
+++ b/tests/Priority_Task_Test.cpp
@@ -29,7 +29,6 @@ class Priority_Task : public ACE_Task<ACE_MT_SYNCH>
public:
Priority_Task (void);
- int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
int close (u_long = 0);
int open (void *);
int svc (void);
@@ -65,7 +64,7 @@ Priority_Task::svc (void)
ACE_hthread_t thr_handle;
ACE_Thread::self (thr_handle);
int prio;
- ACE_Thread::getprio (thr_handle, &prio);
+ ACE_Thread::getprio (thr_handle, prio);
ACE_ASSERT (this->priority_ == prio);
return 0;
}
diff --git a/tests/Reactors_Test.cpp b/tests/Reactors_Test.cpp
index 8bad943910d..2c459cdbf15 100644
--- a/tests/Reactors_Test.cpp
+++ b/tests/Reactors_Test.cpp
@@ -36,7 +36,6 @@ public:
virtual int open (void *args = 0);
virtual int close (u_long flags = 0);
- virtual int put (ACE_Message_Block *, ACE_Time_Value *tv = 0);
virtual int svc (void);
virtual int handle_input (ACE_HANDLE handle);
@@ -97,12 +96,6 @@ Test_Task::close (u_long)
}
int
-Test_Task::put (ACE_Message_Block *, ACE_Time_Value *)
-{
- return 0;
-}
-
-int
Test_Task::svc (void)
{
ACE_NEW_THREAD;
diff --git a/tests/Task_Test.cpp b/tests/Task_Test.cpp
index be1256c3023..27c4d1bd98e 100644
--- a/tests/Task_Test.cpp
+++ b/tests/Task_Test.cpp
@@ -43,11 +43,6 @@ private:
int n_iterations_;
// Number of iterations to run.
-
- // = Not needed for this test.
- virtual int open (void *) { return 0; }
- virtual int close (u_long) { return 0; }
- virtual int put (ACE_Message_Block *, ACE_Time_Value *) { return 0; }
};
Barrier_Task::Barrier_Task (ACE_Thread_Manager *thr_mgr,
diff --git a/tests/Thread_Pool_Test.cpp b/tests/Thread_Pool_Test.cpp
index cb7d9069b9a..2363850e7f3 100644
--- a/tests/Thread_Pool_Test.cpp
+++ b/tests/Thread_Pool_Test.cpp
@@ -41,7 +41,7 @@ public:
// Iterate <n_iterations> time printing off a message and "waiting"
// for all other threads to complete this iteration.
- virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv=0);
+ virtual int put (ACE_Message_Block *mb, ACE_Time_Value *tv = 0);
// This allows the producer to pass messages to the <Thread_Pool>.
private:
diff --git a/tests/UPIPE_SAP_Test.cpp b/tests/UPIPE_SAP_Test.cpp
index f1036c563cf..64a851c28bf 100644
--- a/tests/UPIPE_SAP_Test.cpp
+++ b/tests/UPIPE_SAP_Test.cpp
@@ -100,17 +100,6 @@ acceptor (void *args)
ACE_UPIPE_Acceptor *acceptor = (ACE_UPIPE_Acceptor *) args;
ACE_UPIPE_Stream s_stream;
- ACE_hthread_t thr_handle;
-
- // Spawn a connector thread.
- if (ACE_Thread::spawn (ACE_THR_FUNC (connector),
- (void *) 0,
- THR_NEW_LWP | THR_DETACHED,
- 0,
- &thr_handle) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 0);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) acceptor starting accept\n"));
if (acceptor->accept (s_stream) == -1)
ACE_DEBUG ((LM_DEBUG,
@@ -163,7 +152,7 @@ main (int, char *[])
// Spawn a acceptor thread.
if (ACE_Thread::spawn (ACE_THR_FUNC (acceptor),
(void *) &acc,
- THR_NEW_LWP | THR_DETACHED,
+ THR_NEW_LWP,
0,
&thr_handle_acceptor) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
@@ -171,15 +160,22 @@ main (int, char *[])
// Spawn a connector thread.
if (ACE_Thread::spawn (ACE_THR_FUNC (connector),
(void *) 0,
- THR_NEW_LWP | THR_DETACHED,
+ THR_NEW_LWP,
0,
&thr_handle_connector) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn"), 1);
// Wait for both the acceptor and connector threads to exit.
- ACE_Thread::join (thr_handle_connector);
- ACE_Thread::join (thr_handle_acceptor);
+ if (ACE_Thread::join (thr_handle_connector) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "join"), -1);
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%t) joined with connector thread\n"));
+ if (ACE_Thread::join (thr_handle_acceptor) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "join"), -1);
+ else
+ ACE_DEBUG ((LM_DEBUG, "(%t) joined with acceptor thread\n"));
+
#else
ACE_ERROR ((LM_ERROR, "threads and/or UPIPE not supported on this platform\n"));
#endif /* ACE_HAS_THREADS */
diff --git a/tests/run_tests.sh b/tests/run_tests.sh
index 92ac675fa09..830260bbb6c 100755
--- a/tests/run_tests.sh
+++ b/tests/run_tests.sh
@@ -66,7 +66,7 @@ run Reader_Writer_Test # uses Thread_Manager, Mutex
# ifdef ACE_HAS_STREAM_PIPES
run SPIPE_Test # uses SPIPE_Acceptor/Connector, Thread_Manager
-# run UPIPE_SAP_Test # uses UPIPE, Thread, Thread_Manager
+run UPIPE_SAP_Test # uses UPIPE, Thread, Thread_Manager
run Barrier_Test # uses Service_Config, Barrier
run Buffer_Stream_Test # uses Service_Config, Module (Stream,Task, Message_Queue)