diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-05-14 05:56:57 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1998-05-14 05:56:57 +0000 |
commit | 1aee7169ab074b049c2832a0271f3e89999b59bf (patch) | |
tree | 5df3f5a8ddf6a44505dbc97b45a03f13cd991b0f | |
parent | 64eddc069f4148487db08298857c6367683fc1e3 (diff) | |
download | ATCD-1aee7169ab074b049c2832a0271f3e89999b59bf.tar.gz |
*** empty log message ***
-rw-r--r-- | examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp | 276 | ||||
-rw-r--r-- | examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp | 261 |
2 files changed, 463 insertions, 74 deletions
diff --git a/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp b/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp index 54ed35d8518..64dcf2920c8 100644 --- a/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp +++ b/examples/IPC_SAP/SOCK_SAP/CPP-inclient.cpp @@ -1,7 +1,7 @@ // $Id$ -// This tests the non-blocking features of the ACE_SOCK_Connector -// class. +// This tests the features of the ACE_SOCK_Connector and +// ACE_SOCK_Stream classes. #include "ace/SOCK_Connector.h" #include "ace/INET_Addr.h" @@ -38,10 +38,29 @@ public: const char *quit_string (void) const; // String that shuts down the client/server. - ssize_t read (char *buf, size_t len, size_t &iterations); + ssize_t read (void *buf, size_t len, size_t &iterations); // Read from the appropriate location. + size_t message_len (void) const; + // Returns the length of the message to send. + + const void *message_buf (void) const; + // Returns a pointer to the message. + + ACE_THR_FUNC thr_func (void); + // Returns a pointer to the entry point into the thread that runs + // the client test function. + private: + int initialize_message (void); + // Initialize the message we're sending to the user. + + static void *twoway_client_test (void *); + // Performs the twoway test. + + static void *oneway_client_test (void *); + // Performs the oneway test. + const char *host_; // Host of the server. @@ -57,11 +76,20 @@ private: const char *quit_string_; // String that shuts down the client/server. + size_t message_len_; + // Size of the message we send to the server. + + void *message_buf_; + // Pointer to the message we send to the server. + ACE_HANDLE io_source_; // Are we reading I/O from ACE_STDIN or from our generator? size_t iterations_; // Number of iterations. + + char oneway_; + // Are we running oneway or twoway? }; Options::Options (void) @@ -69,41 +97,93 @@ Options::Options (void) port_ (ACE_DEFAULT_SERVER_PORT), timeout_ (ACE_DEFAULT_TIMEOUT), threads_ (10), - quit_string_ ("quit"), + quit_string_ ("q"), + message_len_ (0), + message_buf_ (0), io_source_ (ACE_INVALID_HANDLE), // Defaults to using the generator. - iterations_ (10000) + iterations_ (10000), + oneway_ (1) // Make oneway calls the default. +{ +} + +// Options Singleton. +typedef ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX> OPTIONS; + +int +Options::initialize_message (void) +{ + // Check for default case. + if (this->message_len_ == 0) + { + ACE_ALLOCATOR_RETURN (this->message_buf_, + ACE_OS::strdup ("TAO"), + -1); + this->message_len_ = ACE_OS::strlen ("TAO"); + } + else + { + ACE_ALLOCATOR_RETURN (this->message_buf_, + ACE_OS::malloc (this->message_len_), + -1); + + ACE_OS::memset (this->message_buf_, + 'a', + this->message_len_); + } + + return 0; +} + +size_t +Options::message_len (void) const { + return this->message_len_; +} + +const void * +Options::message_buf (void) const +{ + return this->message_buf_; } ssize_t -Options::read (char *buf, size_t len, size_t &iteration) +Options::read (void *buf, size_t len, size_t &iteration) { - if (io_source_ == ACE_STDIN) + if (this->io_source_ == ACE_STDIN) return ACE_OS::read (ACE_STDIN, buf, sizeof buf); else if (iteration >= this->iterations_) return 0; else { - ACE_OS::strncpy (buf, "TAO", len); + size_t size = this->message_len (); + ACE_OS::memcpy (buf, + this->message_buf (), + size); iteration++; - return ACE_OS::strlen ("TAO") + 1; + return size; } } int Options::parse_args (int argc, char *argv[]) { - ACE_Get_Opt getopt (argc, argv, "h:i:p:q:st:T:", 1); + ACE_Get_Opt getopt (argc, argv, "2h:i:m:p:q:st:T:", 1); for (int c; (c = getopt ()) != -1; ) switch (c) { + case '2': // Disable the oneway client. + this->oneway_ = 0; + break; case 'h': this->host_ = getopt.optarg; break; case 'i': this->iterations_ = ACE_OS::atoi (getopt.optarg); break; + case 'm': + this->message_len_ = ACE_OS::atoi (getopt.optarg); + break; case 'p': this->port_ = ACE_OS::atoi (getopt.optarg); break; @@ -121,11 +201,11 @@ Options::parse_args (int argc, char *argv[]) break; default: ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) usage: %s [-h <host>] [-p <port>] [-q <quit string>] [-s] [-t <threads>] [-T <timeout>]"), - 1); + "(%P|%t) usage: %n [-2] [-h <host>] [-i iterations] [-m message-size] [-p <port>] [-q <quit string>] [-s] [-t <threads>] [-T <timeout>]"), + -1); } - return 0; + return this->initialize_message (); } u_short @@ -158,26 +238,23 @@ Options::timeout (void) const return (ACE_Time_Value *) &this->timeout_; } -// Options Singleton. -typedef ACE_Singleton<Options, ACE_SYNCH_RECURSIVE_MUTEX> OPTIONS; - -// Entry point to the client service. +// Static function entry point to the oneway client service. void * -client (void *) +Options::oneway_client_test (void *) { - char buf[BUFSIZ]; - Options *options = OPTIONS::instance (); ACE_SOCK_Stream cli_stream; - ACE_INET_Addr remote_addr (options->port (), + // Add one to the port for the oneway test! + ACE_INET_Addr remote_addr (options->port () + 1, options->host ()); ACE_SOCK_Connector con; // Initiate blocking connection with server. - ACE_DEBUG ((LM_DEBUG, "(%P|%t) starting connect\n")); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting oneway connect\n")); if (con.connect (cli_stream, remote_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -186,50 +263,163 @@ client (void *) 0); else ACE_DEBUG ((LM_DEBUG, - "(%P|%t) connected to %s\n", - remote_addr.get_host_name ())); + "(%P|%t) connected to %s at port %d\n", + remote_addr.get_host_name (), + remote_addr.get_port_number ())); + + ACE_UINT32 len = htonl (options->message_len ()); + + if (cli_stream.send_n ((void *) &len, + sizeof (ACE_UINT32)) != sizeof (ACE_UINT32)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "send_n failed"), + 0); + + // Allocate the transmit buffer. + void *buf; + ACE_ALLOCATOR_RETURN (buf, + ACE_OS::malloc (len), + 0); // This variable is allocated off the stack to obviate the need for // locking. size_t iteration = 0; - // Send data to server (correctly handles "incomplete writes"). + // Keep track of return value. + int result = 0; + + // Perform oneway transmission of data to server (correctly handles + // "incomplete writes"). for (ssize_t r_bytes; (r_bytes = options->read (buf, sizeof buf, iteration)) > 0; ) - if (ACE_OS::strncmp (buf, - options->quit_string (), - ACE_OS::strlen (options->quit_string ())) == 0) + if (ACE_OS::memcmp (buf, + options->quit_string (), + ACE_OS::strlen (options->quit_string ())) == 0) break; else if (cli_stream.send (buf, r_bytes, 0) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) %p\n", - "send_n"), - 0); - else if (cli_stream.recv (buf, sizeof buf) <= 0) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) %p\n", - "recv"), - 0); + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "send_n")); + result = -1; + break; + } - // Close the connection completely. - if (cli_stream.close () == -1) + // Close the connection. + cli_stream.close (); + + ACE_OS::free (buf); + return (void *) result; +} + +// Static function entry point to the twoway client service. + +void * +Options::twoway_client_test (void *) +{ + Options *options = OPTIONS::instance (); + + ACE_SOCK_Stream cli_stream; + ACE_INET_Addr remote_addr (options->port (), + options->host ()); + + ACE_SOCK_Connector con; + + // Initiate blocking connection with server. + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) starting twoway connect\n")); + + if (con.connect (cli_stream, remote_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", - "close"), + "connection failed"), 0); - return 0; + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connected to %s at port %d\n", + remote_addr.get_host_name (), + remote_addr.get_port_number ())); + + ACE_UINT32 len = htonl (options->message_len ()); + + if (cli_stream.send_n ((void *) &len, + sizeof (ACE_UINT32)) != sizeof (ACE_UINT32)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "send_n failed"), + 0); + + // Allocate the transmit buffer. + void *buf; + ACE_ALLOCATOR_RETURN (buf, + ACE_OS::malloc (len), + 0); + + // This variable is allocated off the stack to obviate the need for + // locking. + size_t iteration = 0; + + // Keep track of return value. + int result = 0; + + // Perform twoway transmission of data to server (correctly handles + // "incomplete writes"). + + for (ssize_t r_bytes; + (r_bytes = options->read (buf, len, iteration)) > 0; + ) + if (ACE_OS::memcmp (buf, + options->quit_string (), + ACE_OS::strlen (options->quit_string ())) == 0) + break; + else if (cli_stream.send (buf, r_bytes, 0) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "send_n")); + result = -1; + break; + } + else if (cli_stream.recv (buf, r_bytes) <= 0) + { + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", + "recv")); + result = -1; + break; + } + + // Close the connection. + cli_stream.close (); + + ACE_OS::free (buf); + return (void *) result; +} + +ACE_THR_FUNC +Options::thr_func (void) +{ + if (this->oneway_ == 0) + return ACE_THR_FUNC (&Options::twoway_client_test); + else + return ACE_THR_FUNC (&Options::oneway_client_test); } int main (int argc, char *argv[]) { - OPTIONS::instance ()->parse_args (argc, argv); + // Initialize the logger. + ACE_LOG_MSG->open (argv[0]); + + if (OPTIONS::instance ()->parse_args (argc, argv) == -1) + return -1; #if defined (ACE_HAS_THREADS) if (ACE_Thread_Manager::instance ()->spawn_n (OPTIONS::instance ()->threads (), - (ACE_THR_FUNC) client) == -1) + OPTIONS::instance ()->thr_func ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%P|%t) %p\n", "spawn_n"), 1); else ACE_Thread_Manager::instance ()->wait (); diff --git a/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp b/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp index f79865a94cd..57e1c2bd04f 100644 --- a/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp +++ b/examples/IPC_SAP/SOCK_SAP/CPP-inserver.cpp @@ -6,14 +6,16 @@ #include "ace/SOCK_Acceptor.h" #include "ace/Thread_Manager.h" +#include "ace/Handle_Set.h" +#include "ace/Profile_Timer.h" // Are we running verbosely? -static int verbose = 1; +static int verbose = 0; -// Entry point into the server task. +// Function entry point into the twoway server task. static void * -server (void *arg) +twoway_server (void *arg) { ACE_INET_Addr cli_addr; ACE_SOCK_Stream new_stream; @@ -33,14 +35,36 @@ server (void *arg) cli_addr.get_host_name (), cli_addr.get_port_number ())); + // Timer business + ACE_Profile_Timer timer; + timer.start (); + + size_t total_bytes = 0; + size_t message_count = 0; + + void *buf; + ACE_UINT32 len; + + if (new_stream.recv_n ((void *) &len, + sizeof (ACE_UINT32)) != sizeof (ACE_UINT32)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n failed"), + 0); + else + { + len = ntohl (len); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reading messages of size %d\n", + len)); + buf = ACE_OS::malloc (len); + } // Read data from client (terminate on error). for (;;) { - char buf[BUFSIZ]; - - ssize_t r_bytes = new_stream.recv (buf, sizeof buf, 0); - + ssize_t r_bytes = new_stream.recv (buf, len); + if (r_bytes == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "recv")); @@ -56,68 +80,243 @@ server (void *arg) ACE_ERROR ((LM_ERROR, "%p\n", "ACE::write_n")); else if (new_stream.send_n (buf, r_bytes) != r_bytes) ACE_ERROR ((LM_ERROR, "%p\n", "send_n")); + + total_bytes += size_t (r_bytes); + message_count++; } + timer.stop (); + + ACE_Profile_Timer::ACE_Elapsed_Time et; + timer.elapsed_time (et); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\t\treal time = %f secs \n\t\tuser time = %f secs \n\t\tsystem time = %f secs\n"), + et.real_time, + et.user_time, + et.system_time)); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\t\t messages = %d\n\t\t total bytes = %d\n\t\tmbits/sec = %f\n\t\tusec-per-message = %f\n"), + message_count, + total_bytes, + (((double) total_bytes * 8) / et.real_time) / (double) (1024 * 1024), + ((et.user_time + et.system_time) / (double) message_count) * 1000000)); + // Close new endpoint (listening endpoint stays open). - if (new_stream.close () == -1) - ACE_ERROR ((LM_ERROR, "%p\n", "close")); + new_stream.close (); + ACE_OS::free (buf); + return 0; +} + +// Function entry point into the oneway server task. + +static void * +oneway_server (void *arg) +{ + ACE_INET_Addr cli_addr; + ACE_SOCK_Stream new_stream; + ACE_HANDLE handle = (ACE_HANDLE) (long) arg; + + new_stream.set_handle (handle); + + // Make sure we're not in non-blocking mode. + if (new_stream.disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%p\n", + "disable"), + 0); + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) client %s connected from %d\n", + cli_addr.get_host_name (), + cli_addr.get_port_number ())); + + // Timer business + ACE_Profile_Timer timer; + timer.start (); + + size_t total_bytes = 0; + size_t message_count = 0; + + void *buf; + ACE_UINT32 len; + + if (new_stream.recv_n ((void *) &len, + sizeof (ACE_UINT32)) != sizeof (ACE_UINT32)) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "recv_n failed"), + 0); + else + { + len = ntohl (len); + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reading messages of size %d\n", + len)); + buf = ACE_OS::malloc (len); + } + // Read data from client (terminate on error). + + for (;;) + { + ssize_t r_bytes = new_stream.recv (buf, len); + + if (r_bytes == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "recv")); + break; + } + else if (r_bytes == 0) + { + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) reached end of input, connection closed by client\n")); + break; + } + else if (verbose && ACE::write_n (ACE_STDOUT, buf, r_bytes) != r_bytes) + ACE_ERROR ((LM_ERROR, "%p\n", "ACE::write_n")); + + total_bytes += size_t (r_bytes); + message_count++; + } + + timer.stop (); + + ACE_Profile_Timer::ACE_Elapsed_Time et; + timer.elapsed_time (et); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\t\treal time = %f secs \n\t\tuser time = %f secs \n\t\tsystem time = %f secs\n"), + et.real_time, + et.user_time, + et.system_time)); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("\t\t messages = %d\n\t\t total bytes = %d\n\t\tmbits/sec = %f\n\t\tusec-per-message = %f\n"), + message_count, + total_bytes, + (((double) total_bytes * 8) / et.real_time) / (double) (1024 * 1024), + ((et.user_time + et.system_time) / (double) message_count) * 1000000)); + + // Close new endpoint (listening endpoint stays open). + new_stream.close (); + + ACE_OS::free (buf); return 0; } static int run_event_loop (u_short port) { - ACE_SOCK_Acceptor peer_acceptor; + // Create the oneway and twoway acceptors. + ACE_SOCK_Acceptor twoway_acceptor; + ACE_SOCK_Acceptor oneway_acceptor; - // Create a server address. - ACE_INET_Addr server_addr (port); + // Create the oneway and twoway server addresses. + ACE_INET_Addr twoway_server_addr (port); + ACE_INET_Addr oneway_server_addr (port + 1); - // Create a server, reuse the address. - if (peer_acceptor.open (server_addr, 1) == -1) + // Create acceptors, reuse the address. + if (twoway_acceptor.open (twoway_server_addr, 1) == -1 + || oneway_acceptor.open (oneway_server_addr, 1) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "open"), 1); - else if (peer_acceptor.get_local_addr (server_addr) == -1) + else if (twoway_acceptor.get_local_addr (twoway_server_addr) == -1 + || oneway_acceptor.get_local_addr (oneway_server_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "get_local_addr"), 1); ACE_DEBUG ((LM_DEBUG, - "(%P|%t) starting server at port %d\n", - server_addr.get_port_number ())); + "(%P|%t) starting twoway server at port %d and oneway server at port %d\n", + twoway_server_addr.get_port_number (), + oneway_server_addr.get_port_number ())); // Keep these objects out here to prevent excessive constructor // calls within the loop. ACE_SOCK_Stream new_stream; + fd_set handles; + + FD_ZERO (&handles); + FD_SET (twoway_acceptor.get_handle (), &handles); + FD_SET (oneway_acceptor.get_handle (), &handles); + // Performs the iterative server activities. for (;;) { ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT); + fd_set temp = handles; - if (peer_acceptor.accept (new_stream, 0, &timeout) == -1) - { - ACE_ERROR ((LM_ERROR, "%p\n", "accept")); - continue; - } + int result = ACE_OS::select (int (oneway_acceptor.get_handle ()) + 1, + (fd_set *) &temp, + 0, + 0, + timeout); + if (result == -1) + ACE_ERROR ((LM_ERROR, + "(%P|%t) %p\n", "select")); + else if (result == 0) + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) select timed out\n")); + else + { + if (FD_ISSET (twoway_acceptor.get_handle (), &temp)) + { + if (twoway_acceptor.accept (new_stream) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "accept")); + continue; + } + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) spawning twoway server\n")); #if defined (ACE_HAS_THREADS) - // Spawn a new thread and run the new connection in that thread of - // control using the <server> function as the entry point. - if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) server, - (void *) new_stream.get_handle (), - THR_DETACHED) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P|%t) %p\n", - "spawn"), + // Spawn a new thread and run the new connection in that thread of + // control using the <server> function as the entry point. + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) twoway_server, + (void *) new_stream.get_handle (), + THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "spawn"), 1); #else - server ((void *) new_stream.get_handle ()); + twoway_server ((void *) new_stream.get_handle ()); #endif /* ACE_HAS_THREADS */ + } + if (FD_ISSET (oneway_acceptor.get_handle (), &temp)) + { + if (oneway_acceptor.accept (new_stream) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "accept")); + continue; + } + else + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) spawning oneway server\n")); + +#if defined (ACE_HAS_THREADS) + // Spawn a new thread and run the new connection in that thread of + // control using the <server> function as the entry point. + if (ACE_Thread_Manager::instance ()->spawn ((ACE_THR_FUNC) oneway_server, + (void *) new_stream.get_handle (), + THR_DETACHED) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "(%P|%t) %p\n", + "spawn"), + 1); +#else + oneway_server ((void *) new_stream.get_handle ()); +#endif /* ACE_HAS_THREADS */ + } + } } /* NOTREACHED */ |