diff options
author | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-05-08 16:01:39 +0000 |
---|---|---|
committer | bala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-05-08 16:01:39 +0000 |
commit | 0358eb34eaae957b56f4ae853285839424bf5fb8 (patch) | |
tree | a3813fdae5971ec30e008935c87970245668b79a /tests | |
parent | 54183d510d49b3869c7f847d5150093da509006c (diff) | |
download | ATCD-0358eb34eaae957b56f4ae853285839424bf5fb8.tar.gz |
ChangeLogTag: Wed May 8 10:58:15 2002 Alex Libman <AlexL@rumblegroup.com>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/Proactor_Test.cpp | 837 |
1 files changed, 458 insertions, 379 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index fbc14adc5c5..e9966371ae3 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -371,6 +371,9 @@ public: Receiver (Acceptor *acceptor = 0, int index = -1); ~Receiver (void); + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + /// This is called after the new connection has been accepted. virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); @@ -392,7 +395,6 @@ protected: private: int initiate_read_stream (void); int initiate_write_stream (ACE_Message_Block &mb, int nbytes); - int check_destroy (void); void cancel (); Acceptor *acceptor_; @@ -401,10 +403,12 @@ private: ACE_Asynch_Read_Stream rs_; ACE_Asynch_Write_Stream ws_; ACE_HANDLE handle_; - ACE_SYNCH_RECURSIVE_MUTEX lock_; + ACE_SYNCH_MUTEX lock_; long io_count_; int flg_cancel_; + long total_snd_; + long total_rcv_; }; class Acceptor : public ACE_Asynch_Acceptor<Receiver> @@ -412,6 +416,8 @@ class Acceptor : public ACE_Asynch_Acceptor<Receiver> friend class Receiver; public: int get_number_sessions (void) { return this->sessions_; } + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } Acceptor (void); virtual ~Acceptor (void); @@ -429,11 +435,15 @@ private: ACE_SYNCH_RECURSIVE_MUTEX lock_; int sessions_; Receiver *list_receivers_[MAX_RECEIVERS]; + long total_snd_; + long total_rcv_; }; // ************************************************************* Acceptor::Acceptor (void) - : sessions_ (0) + : sessions_ (0), + total_snd_(0), + total_rcv_(0) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); @@ -498,13 +508,26 @@ Acceptor::on_delete_receiver (Receiver & rcvr) ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->sessions_--; + + this->total_snd_ += rcvr.get_total_snd(); + this->total_rcv_ += rcvr.get_total_rcv(); + if (rcvr.index_ >= 0 && rcvr.index_ < MAX_RECEIVERS && this->list_receivers_[rcvr.index_] == &rcvr) this->list_receivers_[rcvr.index_] = 0; + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ()); + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ()); + ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"), + ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + rcvr.index_, + bufs, + bufr, this->sessions_)); } @@ -535,7 +558,9 @@ Receiver::Receiver (Acceptor * acceptor, int index) index_ (index), handle_ (ACE_INVALID_HANDLE), io_count_ (0), - flg_cancel_ (0) + flg_cancel_(0), + total_snd_(0), + total_rcv_(0) { if (this->acceptor_ != 0) this->acceptor_->on_new_receiver (*this); @@ -553,26 +578,10 @@ Receiver::~Receiver (void) this->handle_= ACE_INVALID_HANDLE; } -// return true if we alive, false if we commited suicide -int -Receiver::check_destroy (void) -{ - { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); - - if (this->io_count_ > 0) - return 1; - } - - delete this; - return 0; -} - - void Receiver::cancel () { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); this->flg_cancel_ = 1; this->ws_.cancel (); @@ -584,29 +593,33 @@ Receiver::cancel () void Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) { - this->handle_ = handle; + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - if (this->ws_.open (*this, this->handle_) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open"))); - else if (this->rs_.open (*this, this->handle_) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); - else - this->initiate_read_stream (); + this->handle_ = handle; + + if (this->ws_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open"))); + else if (this->rs_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); + else + this->initiate_read_stream (); - this->check_destroy (); + if (this->io_count_ > 0) + return; + } + delete this; } int Receiver::initiate_read_stream (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); - - if (this->flg_cancel_ != 0) - return 0; + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) + return -1; ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, @@ -630,13 +643,11 @@ Receiver::initiate_read_stream (void) int Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); - - if (this->flg_cancel_ != 0) - { + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) + { mb.release (); return -1; - } + } if (nbytes <= 0) { @@ -662,146 +673,151 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes) void Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - ACE_Message_Block & mb = result.message_block (); + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ ); - // Reset pointers. - mb.rd_ptr ()[result.bytes_transferred ()] = '\0'; + ACE_Message_Block & mb = result.message_block (); - if (loglevel == 0 - || result.bytes_transferred () == 0 - || result.error () != 0) - { - LogLocker log_lock; + // Reset pointers. + mb.rd_ptr ()[result.bytes_transferred ()] = '\0'; - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"), - this->index_)); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_to_read"), - result.bytes_to_read ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("handle"), - result.handle ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_transfered"), - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("act"), - (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("success"), - result.success ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("completion_key"), - (u_long) result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("error"), - result.error ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %s\n"), - ACE_TEXT ("message_block"), - mb.rd_ptr ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** end of message ****************\n"))); - } + if (loglevel == 0 + || result.bytes_transferred () == 0 + || result.error () != 0) + { + LogLocker log_lock; - if (result.error () == 0 && result.bytes_transferred () != 0) - { - if(this->initiate_write_stream (mb, - result.bytes_transferred ()) == 0) - { - if (duplex != 0) - { - // Initiate new read from the stream. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"), + this->index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_read"), + result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("act"), + (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("completion_key"), + (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** end of message ****************\n"))); + } + + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_rcv_ += result.bytes_transferred (); + + if (this->initiate_write_stream (mb, + result.bytes_transferred ()) == 0) + { + if (duplex != 0) // Initiate new read from the stream. this->initiate_read_stream (); - } - } - } - else - mb.release (); + } + } + else + mb.release (); - { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; + if (this->io_count_ > 0) + return; } - - this->check_destroy (); + delete this; } void Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { - ACE_Message_Block & mb = result.message_block (); + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - if (loglevel == 0 || - result.bytes_transferred () == 0 || - result.error () != 0) - { - LogLocker log_lock; + ACE_Message_Block & mb = result.message_block (); - //mb.rd_ptr () [0] = '\0'; - mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); + if (loglevel == 0 || + result.bytes_transferred () == 0 || + result.error () != 0) + { + LogLocker log_lock; - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"), - this->index_)); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_to_write"), - result.bytes_to_write ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("handle"), - result.handle ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_transfered"), - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("act"), - (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("success"), - result.success ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("completion_key"), - (u_long) result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("error"), - result.error ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %s\n"), - ACE_TEXT ("message_block"), - mb.rd_ptr ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** end of message ****************\n"))); - } + //mb.rd_ptr () [0] = '\0'; + mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); - mb.release (); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"), + this->index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_write"), + result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("act"), + (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("completion_key"), + (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** end of message ****************\n"))); + } - if (result.error () == 0 && result.bytes_transferred () != 0) - { - if (duplex == 0) - this->initiate_read_stream (); - } + mb.release (); + + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_snd_ += result.bytes_transferred (); + + if (duplex == 0) + this->initiate_read_stream (); + } - { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; + if (this->io_count_ > 0) + return; } - - this->check_destroy (); + delete this; } // ******************************************* @@ -822,6 +838,9 @@ public: Sender (Connector *connector = 0, int index = -1); ~Sender (void); + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous reads from the socket complete @@ -829,7 +848,6 @@ public: // This is called when asynchronous writes from the socket complete private: - int check_destroy (void); int initiate_read_stream (void); int initiate_write_stream (void); void cancel (); @@ -841,10 +859,12 @@ private: ACE_Asynch_Write_Stream ws_; ACE_HANDLE handle_; - ACE_SYNCH_RECURSIVE_MUTEX lock_; + ACE_SYNCH_MUTEX lock_; long io_count_; int flg_cancel_; + long total_snd_; + long total_rcv_; }; class Connector : public ACE_Asynch_Connector<Sender> @@ -852,6 +872,8 @@ class Connector : public ACE_Asynch_Connector<Sender> friend class Sender; public: int get_number_sessions (void) { return this->sessions_; } + long get_total_snd (void) { return this->total_snd_; } + long get_total_rcv (void) { return this->total_rcv_; } Connector (void); virtual ~Connector (void); @@ -870,12 +892,16 @@ private: ACE_SYNCH_RECURSIVE_MUTEX lock_; int sessions_; Sender *list_senders_[MAX_SENDERS]; + long total_snd_; + long total_rcv_; }; // ************************************************************* Connector::Connector (void) - : sessions_ (0) + : sessions_ (0), + total_snd_(0), + total_rcv_(0) { ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); @@ -938,13 +964,25 @@ Connector::on_delete_sender (Sender &sndr) ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->sessions_--; + this->total_snd_ += sndr.get_total_snd(); + this->total_rcv_ += sndr.get_total_rcv(); + if (sndr.index_ >= 0 && sndr.index_ < MAX_SENDERS && this->list_senders_[sndr.index_] == &sndr) this->list_senders_[sndr.index_] = 0; + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ()); + ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ()); + ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"), + ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + sndr.index_, + bufs, + bufr, this->sessions_)); } @@ -1016,7 +1054,9 @@ Sender::Sender (Connector * connector, int index) connector_ (connector), handle_ (ACE_INVALID_HANDLE), io_count_ (0), - flg_cancel_ (0) + flg_cancel_(0), + total_snd_ (0), + total_rcv_ (0) { if (this->connector_ != 0) this->connector_->on_new_sender (*this); @@ -1036,25 +1076,10 @@ Sender::~Sender (void) this->handle_= ACE_INVALID_HANDLE; } -// return true if we alive, false we commited suicide -int -Sender::check_destroy (void) -{ - { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); - - if (this->io_count_ > 0) - return 1; - } - - delete this; - return 0; -} - void Sender::cancel () { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); this->flg_cancel_ = 1; this->ws_.cancel (); @@ -1066,36 +1091,38 @@ Sender::cancel () void Sender::open (ACE_HANDLE handle, ACE_Message_Block &) { - this->handle_ = handle; - - // Open ACE_Asynch_Write_Stream - if (this->ws_.open (*this, this->handle_) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open"))); - - // Open ACE_Asynch_Read_Stream - else if (this->rs_.open (*this, this->handle_) == -1) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), - ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open"))); - - else if (this->initiate_write_stream () == 0) - { - if (duplex != 0) - // Start an asynchronous read - this->initiate_read_stream (); - } + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); + this->handle_ = handle; + + // Open ACE_Asynch_Write_Stream + if (this->ws_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open"))); + + // Open ACE_Asynch_Read_Stream + else if (this->rs_.open (*this, this->handle_) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open"))); + + else if (this->initiate_write_stream () == 0) + { + if (duplex != 0) // Start an asynchronous read + this->initiate_read_stream (); + } - this->check_destroy (); + if (this->io_count_ > 0) + return; + } + delete this; } int Sender::initiate_write_stream (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); - - if (this->flg_cancel_ != 0) + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) return -1; #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) @@ -1158,9 +1185,7 @@ Sender::initiate_write_stream (void) int Sender::initiate_read_stream (void) { - ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1); - - if (this->flg_cancel_ != 0) + if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) return -1; #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) @@ -1225,198 +1250,207 @@ Sender::initiate_read_stream (void) void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { - ACE_Message_Block & mb = result.message_block (); + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - if (loglevel == 0 - || result.bytes_transferred () == 0 - || result.error () != 0) - { - LogLocker log_lock; + ACE_Message_Block & mb = result.message_block (); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"), - index_)); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_to_write"), - result.bytes_to_write ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("handle"), - result.handle ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_transfered"), - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("act"), + if (loglevel == 0 + || result.bytes_transferred () == 0 + || result.error () != 0) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"), + index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_write"), + result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("act"), (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("success"), - result.success ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("completion_key"), - (u_long) result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("error"), - result.error ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("completion_key"), + (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) - size_t bytes_transferred = result.bytes_transferred (); - char index = 0; - for (ACE_Message_Block* mb_i = &mb; - (mb_i != 0) && (bytes_transferred > 0); - mb_i = mb_i->cont ()) - { + size_t bytes_transferred = result.bytes_transferred (); + char index = 0; + for (ACE_Message_Block* mb_i = &mb; + (mb_i != 0) && (bytes_transferred > 0); + mb_i = mb_i->cont ()) + { + // write 0 at string end for proper printout (if end of mb, it's 0 already) + mb_i->rd_ptr()[0] = '\0'; + + size_t len = mb_i->rd_ptr () - mb_i->base (); + + // move rd_ptr backwards as required for printout + if (len >= bytes_transferred) + { + mb_i->rd_ptr (0 - bytes_transferred); + bytes_transferred = 0; + } + else + { + mb_i->rd_ptr (0 - len); + bytes_transferred -= len; + } + + ++index; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s%d = %s\n"), + ACE_TEXT ("message_block, part "), + index, + mb_i->rd_ptr ())); + } +#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ // write 0 at string end for proper printout (if end of mb, it's 0 already) - mb_i->rd_ptr()[0] = '\0'; - - size_t len = mb_i->rd_ptr () - mb_i->base (); - + mb.rd_ptr()[0] = '\0'; // move rd_ptr backwards as required for printout - if (len >= bytes_transferred) - { - mb_i->rd_ptr (0 - bytes_transferred); - bytes_transferred = 0; - } - else - { - mb_i->rd_ptr (0 - len); - bytes_transferred -= len; - } + mb.rd_ptr (- result.bytes_transferred ()); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); +#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - ++index; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s%d = %s\n"), - ACE_TEXT ("message_block, part "), - index, - mb_i->rd_ptr ())); + ACE_TEXT ("**** end of message ****************\n"))); } -#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - // write 0 at string end for proper printout (if end of mb, it's 0 already) - mb.rd_ptr()[0] = '\0'; - // move rd_ptr backwards as required for printout - mb.rd_ptr (- result.bytes_transferred ()); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %s\n"), - ACE_TEXT ("message_block"), - mb.rd_ptr ())); -#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** end of message ****************\n"))); - } + mb.release (); - mb.release (); + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_snd_ += result.bytes_transferred (); - if (result.error () == 0 && result.bytes_transferred () != 0) - { - if (duplex != 0) // full duplex, continue write - this->initiate_write_stream (); - else // half-duplex read reply, after read we will start write - this->initiate_read_stream (); - } + if (duplex != 0 && // full duplex, continue write + (this->total_snd_- this->total_rcv_) < 1024 ) //flow control + this->initiate_write_stream (); + else // half-duplex read reply, after read we will start write + this->initiate_read_stream (); + } - { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; + if (this->io_count_ > 0) + return; } - - this->check_destroy (); + delete this; } void Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - ACE_Message_Block & mb = result.message_block (); + { + ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); - if (loglevel == 0 - || result.bytes_transferred () == 0 - || result.error () != 0) - { - LogLocker log_lock; + ACE_Message_Block & mb = result.message_block (); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"), - index_)); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_to_read"), - result.bytes_to_read ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("handle"), - result.handle ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("bytes_transfered"), - result.bytes_transferred ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("act"), - (u_long) result.act ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("success"), - result.success ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %u\n"), - ACE_TEXT ("completion_key"), - (u_long) result.completion_key ())); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %d\n"), - ACE_TEXT ("error"), - result.error ())); + if (loglevel == 0 + || result.bytes_transferred () == 0 + || result.error () != 0) + { + LogLocker log_lock; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"), + index_)); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_to_read"), + result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("handle"), + result.handle ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("bytes_transfered"), + result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("act"), + (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("success"), + result.success ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %u\n"), + ACE_TEXT ("completion_key"), + (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %d\n"), + ACE_TEXT ("error"), + result.error ())); #if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) - char index = 0; - for (ACE_Message_Block* mb_i = &mb; - mb_i != 0; - mb_i = mb_i->cont ()) - { - ++index; + char index = 0; + for (ACE_Message_Block* mb_i = &mb; + mb_i != 0; + mb_i = mb_i->cont ()) + { + ++index; + // write 0 at string end for proper printout + mb_i->wr_ptr()[0] = '\0'; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s%d = %s\n"), + ACE_TEXT ("message_block, part "), + index, + mb_i->rd_ptr ())); + } +#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ // write 0 at string end for proper printout - mb_i->wr_ptr()[0] = '\0'; + mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%s = %s\n"), + ACE_TEXT ("message_block"), + mb.rd_ptr ())); +#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s%d = %s\n"), - ACE_TEXT ("message_block, part "), - index, - mb_i->rd_ptr ())); + ACE_TEXT ("**** end of message ****************\n"))); } -#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - // write 0 at string end for proper printout - mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("%s = %s\n"), - ACE_TEXT ("message_block"), - mb.rd_ptr ())); -#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */ - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** end of message ****************\n"))); - } + mb.release (); - mb.release (); + if (result.error () == 0 && result.bytes_transferred () > 0) + { + this->total_rcv_ += result.bytes_transferred (); - if (result.error () == 0 && result.bytes_transferred () != 0) - { - if (duplex != 0) // full duplex, continue read - this->initiate_read_stream (); - else // half-duplex write, after write we will start read - this->initiate_write_stream (); - } + if (duplex != 0) // full duplex, continue read + this->initiate_read_stream (); + else // half-duplex write, after write we will start read + this->initiate_write_stream (); + } - { - ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_); this->io_count_--; + if (this->io_count_ > 0) + return; } - - this->check_destroy (); + delete this; } // ************************************************************* @@ -1476,9 +1510,11 @@ set_proactor_type (const ACE_TCHAR *ptype) proactor_type = SUN; return 1; #endif /* sun */ - case 'C': - proactor_type = CB; - return 1; +#if defined (__sgi) + case 'C': + proactor_type = CB; + return 1; +#endif /* __sgi */ default: break; } @@ -1506,7 +1542,7 @@ parse_args (int argc, ACE_TCHAR *argv[]) #endif threads = 3; // size of Proactor thread pool senders = 20; // number of senders - loglevel = 0; // log level : 0 full/ 1 only errors + loglevel = 1; // log level : 0 full/ 1 only errors seconds = 20; // time to run in seconds return 0; } @@ -1608,20 +1644,63 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) ACE_OS::sleep (seconds); } + //Cancel all pending AIO on Connector and Senders + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"), + connector.get_number_sessions () + )); + connector.cancel_all (); + + //Cancel all pending AIO on Acceptor And Receivers + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"), + acceptor.get_number_sessions () + )); + acceptor.cancel_all (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Stop Thread Pool Task\n") + )); task1.stop (); - + + // As Proactor event loop now is inactive it is safe to destroy all + // Senders ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("\nNumber of Receivers objects = %d\n") - ACE_TEXT ("\nNumber of Sender objects = %d\n"), - acceptor.get_number_sessions (), - connector.get_number_sessions ())); + ACE_TEXT ("Stop Connector/Senders: sessions_=%d\n"), + connector.get_number_sessions () + )); + connector.stop (); // As Proactor event loop now is inactive it is safe to destroy all - // senders - - connector.stop (); + // Receivers + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Stop Acceptor/Receivers:sessions_=%d\n"), + acceptor.get_number_sessions () + )); acceptor.stop (); + //Print statistic + ACE_TCHAR bufs [256]; + ACE_TCHAR bufr [256]; + + ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd()); + ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"), + bufs, + bufr + )); + + ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd()); + ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() ); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"), + bufs, + bufr + )); + ACE_END_TEST; return 0; |