summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-05-08 16:01:39 +0000
committerbala <bala@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2002-05-08 16:01:39 +0000
commit0358eb34eaae957b56f4ae853285839424bf5fb8 (patch)
treea3813fdae5971ec30e008935c87970245668b79a /tests
parent54183d510d49b3869c7f847d5150093da509006c (diff)
downloadATCD-0358eb34eaae957b56f4ae853285839424bf5fb8.tar.gz
ChangeLogTag: Wed May 8 10:58:15 2002 Alex Libman <AlexL@rumblegroup.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/Proactor_Test.cpp837
1 files changed, 458 insertions, 379 deletions
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index fbc14adc5c5..e9966371ae3 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -371,6 +371,9 @@ public:
Receiver (Acceptor *acceptor = 0, int index = -1);
~Receiver (void);
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+
/// This is called after the new connection has been accepted.
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
@@ -392,7 +395,6 @@ protected:
private:
int initiate_read_stream (void);
int initiate_write_stream (ACE_Message_Block &mb, int nbytes);
- int check_destroy (void);
void cancel ();
Acceptor *acceptor_;
@@ -401,10 +403,12 @@ private:
ACE_Asynch_Read_Stream rs_;
ACE_Asynch_Write_Stream ws_;
ACE_HANDLE handle_;
- ACE_SYNCH_RECURSIVE_MUTEX lock_;
+ ACE_SYNCH_MUTEX lock_;
long io_count_;
int flg_cancel_;
+ long total_snd_;
+ long total_rcv_;
};
class Acceptor : public ACE_Asynch_Acceptor<Receiver>
@@ -412,6 +416,8 @@ class Acceptor : public ACE_Asynch_Acceptor<Receiver>
friend class Receiver;
public:
int get_number_sessions (void) { return this->sessions_; }
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
Acceptor (void);
virtual ~Acceptor (void);
@@ -429,11 +435,15 @@ private:
ACE_SYNCH_RECURSIVE_MUTEX lock_;
int sessions_;
Receiver *list_receivers_[MAX_RECEIVERS];
+ long total_snd_;
+ long total_rcv_;
};
// *************************************************************
Acceptor::Acceptor (void)
- : sessions_ (0)
+ : sessions_ (0),
+ total_snd_(0),
+ total_rcv_(0)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
@@ -498,13 +508,26 @@ Acceptor::on_delete_receiver (Receiver & rcvr)
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->sessions_--;
+
+ this->total_snd_ += rcvr.get_total_snd();
+ this->total_rcv_ += rcvr.get_total_rcv();
+
if (rcvr.index_ >= 0
&& rcvr.index_ < MAX_RECEIVERS
&& this->list_receivers_[rcvr.index_] == &rcvr)
this->list_receivers_[rcvr.index_] = 0;
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), rcvr.get_total_snd ());
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), rcvr.get_total_rcv ());
+
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
+ ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ rcvr.index_,
+ bufs,
+ bufr,
this->sessions_));
}
@@ -535,7 +558,9 @@ Receiver::Receiver (Acceptor * acceptor, int index)
index_ (index),
handle_ (ACE_INVALID_HANDLE),
io_count_ (0),
- flg_cancel_ (0)
+ flg_cancel_(0),
+ total_snd_(0),
+ total_rcv_(0)
{
if (this->acceptor_ != 0)
this->acceptor_->on_new_receiver (*this);
@@ -553,26 +578,10 @@ Receiver::~Receiver (void)
this->handle_= ACE_INVALID_HANDLE;
}
-// return true if we alive, false if we commited suicide
-int
-Receiver::check_destroy (void)
-{
- {
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
-
- if (this->io_count_ > 0)
- return 1;
- }
-
- delete this;
- return 0;
-}
-
-
void
Receiver::cancel ()
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
this->flg_cancel_ = 1;
this->ws_.cancel ();
@@ -584,29 +593,33 @@ Receiver::cancel ()
void
Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
{
- this->handle_ = handle;
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
- if (this->ws_.open (*this, this->handle_) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open")));
- else if (this->rs_.open (*this, this->handle_) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
- else
- this->initiate_read_stream ();
+ this->handle_ = handle;
+
+ if (this->ws_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open")));
+ else if (this->rs_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
+ else
+ this->initiate_read_stream ();
- this->check_destroy ();
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
}
int
Receiver::initiate_read_stream (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
-
- if (this->flg_cancel_ != 0)
- return 0;
+ if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
+ return -1;
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
@@ -630,13 +643,11 @@ Receiver::initiate_read_stream (void)
int
Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
-
- if (this->flg_cancel_ != 0)
- {
+ if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
+ {
mb.release ();
return -1;
- }
+ }
if (nbytes <= 0)
{
@@ -662,146 +673,151 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
- ACE_Message_Block & mb = result.message_block ();
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
- // Reset pointers.
- mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
+ ACE_Message_Block & mb = result.message_block ();
- if (loglevel == 0
- || result.bytes_transferred () == 0
- || result.error () != 0)
- {
- LogLocker log_lock;
+ // Reset pointers.
+ mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"),
- this->index_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_to_read"),
- result.bytes_to_read ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("handle"),
- result.handle ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_transfered"),
- result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("act"),
- (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("success"),
- result.success ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("completion_key"),
- (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("error"),
- result.error ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %s\n"),
- ACE_TEXT ("message_block"),
- mb.rd_ptr ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** end of message ****************\n")));
- }
+ if (loglevel == 0
+ || result.bytes_transferred () == 0
+ || result.error () != 0)
+ {
+ LogLocker log_lock;
- if (result.error () == 0 && result.bytes_transferred () != 0)
- {
- if(this->initiate_write_stream (mb,
- result.bytes_transferred ()) == 0)
- {
- if (duplex != 0)
- {
- // Initiate new read from the stream.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"),
+ this->index_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_to_read"),
+ result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("act"),
+ (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("completion_key"),
+ (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb.rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** end of message ****************\n")));
+ }
+
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_rcv_ += result.bytes_transferred ();
+
+ if (this->initiate_write_stream (mb,
+ result.bytes_transferred ()) == 0)
+ {
+ if (duplex != 0) // Initiate new read from the stream.
this->initiate_read_stream ();
- }
- }
- }
- else
- mb.release ();
+ }
+ }
+ else
+ mb.release ();
- {
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
+ if (this->io_count_ > 0)
+ return;
}
-
- this->check_destroy ();
+ delete this;
}
void
Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
- ACE_Message_Block & mb = result.message_block ();
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
- if (loglevel == 0 ||
- result.bytes_transferred () == 0 ||
- result.error () != 0)
- {
- LogLocker log_lock;
+ ACE_Message_Block & mb = result.message_block ();
- //mb.rd_ptr () [0] = '\0';
- mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
+ if (loglevel == 0 ||
+ result.bytes_transferred () == 0 ||
+ result.error () != 0)
+ {
+ LogLocker log_lock;
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"),
- this->index_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_to_write"),
- result.bytes_to_write ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("handle"),
- result.handle ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_transfered"),
- result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("act"),
- (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("success"),
- result.success ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("completion_key"),
- (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("error"),
- result.error ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %s\n"),
- ACE_TEXT ("message_block"),
- mb.rd_ptr ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** end of message ****************\n")));
- }
+ //mb.rd_ptr () [0] = '\0';
+ mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
- mb.release ();
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"),
+ this->index_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_to_write"),
+ result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("act"),
+ (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("completion_key"),
+ (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb.rd_ptr ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** end of message ****************\n")));
+ }
- if (result.error () == 0 && result.bytes_transferred () != 0)
- {
- if (duplex == 0)
- this->initiate_read_stream ();
- }
+ mb.release ();
+
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_snd_ += result.bytes_transferred ();
+
+ if (duplex == 0)
+ this->initiate_read_stream ();
+ }
- {
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
+ if (this->io_count_ > 0)
+ return;
}
-
- this->check_destroy ();
+ delete this;
}
// *******************************************
@@ -822,6 +838,9 @@ public:
Sender (Connector *connector = 0, int index = -1);
~Sender (void);
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
+
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous reads from the socket complete
@@ -829,7 +848,6 @@ public:
// This is called when asynchronous writes from the socket complete
private:
- int check_destroy (void);
int initiate_read_stream (void);
int initiate_write_stream (void);
void cancel ();
@@ -841,10 +859,12 @@ private:
ACE_Asynch_Write_Stream ws_;
ACE_HANDLE handle_;
- ACE_SYNCH_RECURSIVE_MUTEX lock_;
+ ACE_SYNCH_MUTEX lock_;
long io_count_;
int flg_cancel_;
+ long total_snd_;
+ long total_rcv_;
};
class Connector : public ACE_Asynch_Connector<Sender>
@@ -852,6 +872,8 @@ class Connector : public ACE_Asynch_Connector<Sender>
friend class Sender;
public:
int get_number_sessions (void) { return this->sessions_; }
+ long get_total_snd (void) { return this->total_snd_; }
+ long get_total_rcv (void) { return this->total_rcv_; }
Connector (void);
virtual ~Connector (void);
@@ -870,12 +892,16 @@ private:
ACE_SYNCH_RECURSIVE_MUTEX lock_;
int sessions_;
Sender *list_senders_[MAX_SENDERS];
+ long total_snd_;
+ long total_rcv_;
};
// *************************************************************
Connector::Connector (void)
- : sessions_ (0)
+ : sessions_ (0),
+ total_snd_(0),
+ total_rcv_(0)
{
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
@@ -938,13 +964,25 @@ Connector::on_delete_sender (Sender &sndr)
ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->sessions_--;
+ this->total_snd_ += sndr.get_total_snd();
+ this->total_rcv_ += sndr.get_total_rcv();
+
if (sndr.index_ >= 0
&& sndr.index_ < MAX_SENDERS
&& this->list_senders_[sndr.index_] == &sndr)
this->list_senders_[sndr.index_] = 0;
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf ( bufs , ACE_TEXT ("%ld"), sndr.get_total_snd ());
+ ACE_OS::sprintf ( bufr , ACE_TEXT ("%ld"), sndr.get_total_rcv ());
+
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"),
+ ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ sndr.index_,
+ bufs,
+ bufr,
this->sessions_));
}
@@ -1016,7 +1054,9 @@ Sender::Sender (Connector * connector, int index)
connector_ (connector),
handle_ (ACE_INVALID_HANDLE),
io_count_ (0),
- flg_cancel_ (0)
+ flg_cancel_(0),
+ total_snd_ (0),
+ total_rcv_ (0)
{
if (this->connector_ != 0)
this->connector_->on_new_sender (*this);
@@ -1036,25 +1076,10 @@ Sender::~Sender (void)
this->handle_= ACE_INVALID_HANDLE;
}
-// return true if we alive, false we commited suicide
-int
-Sender::check_destroy (void)
-{
- {
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
-
- if (this->io_count_ > 0)
- return 1;
- }
-
- delete this;
- return 0;
-}
-
void
Sender::cancel ()
{
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
this->flg_cancel_ = 1;
this->ws_.cancel ();
@@ -1066,36 +1091,38 @@ Sender::cancel ()
void
Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
{
- this->handle_ = handle;
-
- // Open ACE_Asynch_Write_Stream
- if (this->ws_.open (*this, this->handle_) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open")));
-
- // Open ACE_Asynch_Read_Stream
- else if (this->rs_.open (*this, this->handle_) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
- ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open")));
-
- else if (this->initiate_write_stream () == 0)
- {
- if (duplex != 0)
- // Start an asynchronous read
- this->initiate_read_stream ();
- }
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
+ this->handle_ = handle;
+
+ // Open ACE_Asynch_Write_Stream
+ if (this->ws_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open")));
+
+ // Open ACE_Asynch_Read_Stream
+ else if (this->rs_.open (*this, this->handle_) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open")));
+
+ else if (this->initiate_write_stream () == 0)
+ {
+ if (duplex != 0) // Start an asynchronous read
+ this->initiate_read_stream ();
+ }
- this->check_destroy ();
+ if (this->io_count_ > 0)
+ return;
+ }
+ delete this;
}
int
Sender::initiate_write_stream (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
-
- if (this->flg_cancel_ != 0)
+ if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
@@ -1158,9 +1185,7 @@ Sender::initiate_write_stream (void)
int
Sender::initiate_read_stream (void)
{
- ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_, -1);
-
- if (this->flg_cancel_ != 0)
+ if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
@@ -1225,198 +1250,207 @@ Sender::initiate_read_stream (void)
void
Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
- ACE_Message_Block & mb = result.message_block ();
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
- if (loglevel == 0
- || result.bytes_transferred () == 0
- || result.error () != 0)
- {
- LogLocker log_lock;
+ ACE_Message_Block & mb = result.message_block ();
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"),
- index_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_to_write"),
- result.bytes_to_write ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("handle"),
- result.handle ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_transfered"),
- result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("act"),
+ if (loglevel == 0
+ || result.bytes_transferred () == 0
+ || result.error () != 0)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"),
+ index_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_to_write"),
+ result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("act"),
(u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("success"),
- result.success ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("completion_key"),
- (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("error"),
- result.error ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("completion_key"),
+ (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
- size_t bytes_transferred = result.bytes_transferred ();
- char index = 0;
- for (ACE_Message_Block* mb_i = &mb;
- (mb_i != 0) && (bytes_transferred > 0);
- mb_i = mb_i->cont ())
- {
+ size_t bytes_transferred = result.bytes_transferred ();
+ char index = 0;
+ for (ACE_Message_Block* mb_i = &mb;
+ (mb_i != 0) && (bytes_transferred > 0);
+ mb_i = mb_i->cont ())
+ {
+ // write 0 at string end for proper printout (if end of mb, it's 0 already)
+ mb_i->rd_ptr()[0] = '\0';
+
+ size_t len = mb_i->rd_ptr () - mb_i->base ();
+
+ // move rd_ptr backwards as required for printout
+ if (len >= bytes_transferred)
+ {
+ mb_i->rd_ptr (0 - bytes_transferred);
+ bytes_transferred = 0;
+ }
+ else
+ {
+ mb_i->rd_ptr (0 - len);
+ bytes_transferred -= len;
+ }
+
+ ++index;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s%d = %s\n"),
+ ACE_TEXT ("message_block, part "),
+ index,
+ mb_i->rd_ptr ()));
+ }
+#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
// write 0 at string end for proper printout (if end of mb, it's 0 already)
- mb_i->rd_ptr()[0] = '\0';
-
- size_t len = mb_i->rd_ptr () - mb_i->base ();
-
+ mb.rd_ptr()[0] = '\0';
// move rd_ptr backwards as required for printout
- if (len >= bytes_transferred)
- {
- mb_i->rd_ptr (0 - bytes_transferred);
- bytes_transferred = 0;
- }
- else
- {
- mb_i->rd_ptr (0 - len);
- bytes_transferred -= len;
- }
+ mb.rd_ptr (- result.bytes_transferred ());
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb.rd_ptr ()));
+#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- ++index;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s%d = %s\n"),
- ACE_TEXT ("message_block, part "),
- index,
- mb_i->rd_ptr ()));
+ ACE_TEXT ("**** end of message ****************\n")));
}
-#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- // write 0 at string end for proper printout (if end of mb, it's 0 already)
- mb.rd_ptr()[0] = '\0';
- // move rd_ptr backwards as required for printout
- mb.rd_ptr (- result.bytes_transferred ());
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %s\n"),
- ACE_TEXT ("message_block"),
- mb.rd_ptr ()));
-#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** end of message ****************\n")));
- }
+ mb.release ();
- mb.release ();
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_snd_ += result.bytes_transferred ();
- if (result.error () == 0 && result.bytes_transferred () != 0)
- {
- if (duplex != 0) // full duplex, continue write
- this->initiate_write_stream ();
- else // half-duplex read reply, after read we will start write
- this->initiate_read_stream ();
- }
+ if (duplex != 0 && // full duplex, continue write
+ (this->total_snd_- this->total_rcv_) < 1024 ) //flow control
+ this->initiate_write_stream ();
+ else // half-duplex read reply, after read we will start write
+ this->initiate_read_stream ();
+ }
- {
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
+ if (this->io_count_ > 0)
+ return;
}
-
- this->check_destroy ();
+ delete this;
}
void
Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
- ACE_Message_Block & mb = result.message_block ();
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
- if (loglevel == 0
- || result.bytes_transferred () == 0
- || result.error () != 0)
- {
- LogLocker log_lock;
+ ACE_Message_Block & mb = result.message_block ();
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"),
- index_));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_to_read"),
- result.bytes_to_read ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("handle"),
- result.handle ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("bytes_transfered"),
- result.bytes_transferred ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("act"),
- (u_long) result.act ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("success"),
- result.success ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %u\n"),
- ACE_TEXT ("completion_key"),
- (u_long) result.completion_key ()));
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %d\n"),
- ACE_TEXT ("error"),
- result.error ()));
+ if (loglevel == 0
+ || result.bytes_transferred () == 0
+ || result.error () != 0)
+ {
+ LogLocker log_lock;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"),
+ index_));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_to_read"),
+ result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("handle"),
+ result.handle ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("bytes_transfered"),
+ result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("act"),
+ (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("success"),
+ result.success ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %u\n"),
+ ACE_TEXT ("completion_key"),
+ (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %d\n"),
+ ACE_TEXT ("error"),
+ result.error ()));
#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
- char index = 0;
- for (ACE_Message_Block* mb_i = &mb;
- mb_i != 0;
- mb_i = mb_i->cont ())
- {
- ++index;
+ char index = 0;
+ for (ACE_Message_Block* mb_i = &mb;
+ mb_i != 0;
+ mb_i = mb_i->cont ())
+ {
+ ++index;
+ // write 0 at string end for proper printout
+ mb_i->wr_ptr()[0] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s%d = %s\n"),
+ ACE_TEXT ("message_block, part "),
+ index,
+ mb_i->rd_ptr ()));
+ }
+#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
// write 0 at string end for proper printout
- mb_i->wr_ptr()[0] = '\0';
+ mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%s = %s\n"),
+ ACE_TEXT ("message_block"),
+ mb.rd_ptr ()));
+#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s%d = %s\n"),
- ACE_TEXT ("message_block, part "),
- index,
- mb_i->rd_ptr ()));
+ ACE_TEXT ("**** end of message ****************\n")));
}
-#else /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- // write 0 at string end for proper printout
- mb.rd_ptr()[result.bytes_transferred ()] = '\0'; // for proper printout
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("%s = %s\n"),
- ACE_TEXT ("message_block"),
- mb.rd_ptr ()));
-#endif /* (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) */
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** end of message ****************\n")));
- }
+ mb.release ();
- mb.release ();
+ if (result.error () == 0 && result.bytes_transferred () > 0)
+ {
+ this->total_rcv_ += result.bytes_transferred ();
- if (result.error () == 0 && result.bytes_transferred () != 0)
- {
- if (duplex != 0) // full duplex, continue read
- this->initiate_read_stream ();
- else // half-duplex write, after write we will start read
- this->initiate_write_stream ();
- }
+ if (duplex != 0) // full duplex, continue read
+ this->initiate_read_stream ();
+ else // half-duplex write, after write we will start read
+ this->initiate_write_stream ();
+ }
- {
- ACE_GUARD (ACE_SYNCH_RECURSIVE_MUTEX, monitor, this->lock_);
this->io_count_--;
+ if (this->io_count_ > 0)
+ return;
}
-
- this->check_destroy ();
+ delete this;
}
// *************************************************************
@@ -1476,9 +1510,11 @@ set_proactor_type (const ACE_TCHAR *ptype)
proactor_type = SUN;
return 1;
#endif /* sun */
- case 'C':
- proactor_type = CB;
- return 1;
+#if defined (__sgi)
+ case 'C':
+ proactor_type = CB;
+ return 1;
+#endif /* __sgi */
default:
break;
}
@@ -1506,7 +1542,7 @@ parse_args (int argc, ACE_TCHAR *argv[])
#endif
threads = 3; // size of Proactor thread pool
senders = 20; // number of senders
- loglevel = 0; // log level : 0 full/ 1 only errors
+ loglevel = 1; // log level : 0 full/ 1 only errors
seconds = 20; // time to run in seconds
return 0;
}
@@ -1608,20 +1644,63 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
ACE_OS::sleep (seconds);
}
+ //Cancel all pending AIO on Connector and Senders
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"),
+ connector.get_number_sessions ()
+ ));
+ connector.cancel_all ();
+
+ //Cancel all pending AIO on Acceptor And Receivers
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"),
+ acceptor.get_number_sessions ()
+ ));
+ acceptor.cancel_all ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Stop Thread Pool Task\n")
+ ));
task1.stop ();
-
+
+ // As Proactor event loop now is inactive it is safe to destroy all
+ // Senders
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("\nNumber of Receivers objects = %d\n")
- ACE_TEXT ("\nNumber of Sender objects = %d\n"),
- acceptor.get_number_sessions (),
- connector.get_number_sessions ()));
+ ACE_TEXT ("Stop Connector/Senders: sessions_=%d\n"),
+ connector.get_number_sessions ()
+ ));
+ connector.stop ();
// As Proactor event loop now is inactive it is safe to destroy all
- // senders
-
- connector.stop ();
+ // Receivers
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Stop Acceptor/Receivers:sessions_=%d\n"),
+ acceptor.get_number_sessions ()
+ ));
acceptor.stop ();
+ //Print statistic
+ ACE_TCHAR bufs [256];
+ ACE_TCHAR bufr [256];
+
+ ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), connector.get_total_snd());
+ ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), connector.get_total_rcv() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Connector/Senders total bytes: snd=%s rcv=%s\n"),
+ bufs,
+ bufr
+ ));
+
+ ACE_OS::sprintf (bufs , ACE_TEXT ("%ld"), acceptor.get_total_snd());
+ ACE_OS::sprintf (bufr , ACE_TEXT ("%ld"), acceptor.get_total_rcv() );
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Acceptor/Receivers total bytes: snd=%s rcv=%s\n"),
+ bufs,
+ bufr
+ ));
+
ACE_END_TEST;
return 0;