diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2000-04-15 23:28:56 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2000-04-15 23:28:56 +0000 |
commit | 12a91dc625226b9a6c384df76c35cea4d3c8f67c (patch) | |
tree | 2416601912cdb3d96b8a3a2f18f2f9d8596d1ca9 /apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp | |
parent | 54a8460f2742750dbdb993c37aefb13c2739eca2 (diff) | |
download | ATCD-12a91dc625226b9a6c384df76c35cea4d3c8f67c.tar.gz |
ChangeLogTag:Sat Apr 15 18:17:02 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
Diffstat (limited to 'apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp')
-rw-r--r-- | apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp | 101 |
1 files changed, 93 insertions, 8 deletions
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp index 7a50650d842..ca17494a402 100644 --- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp +++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp @@ -134,11 +134,89 @@ Consumer_Handler::handle_output (ACE_HANDLE) ACE_Message_Block *event = 0; ACE_DEBUG ((LM_DEBUG, - "(%t) in handle_output on handle %d\n", + ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"), this->get_handle ())); + // WIN32 Notes: When the receiver blocked, we started adding to the + // consumer handler's message Q. At this time, we registered a + // callback with the reactor to tell us when the TCP layer signalled + // that we could continue to send messages to the consumer. However, + // Winsock only sends this notification ONCE, so we have to assume + // at the application level, that we can continue to send until we + // get any subsequent blocking signals from the receiver's buffer. + +#if defined (ACE_WIN32) + // Win32 Winsock doesn't trigger multiple "You can write now" + // signals, so we have to assume that we can continue to write until + // we get another EWOULDBLOCK. + + // We cancel the wakeup callback we set earlier. + if (ACE_Reactor::instance ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")), + -1); + // The list had better not be empty, otherwise there's a bug! + while (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case -1: // Error sending message to consumer. + { + // We are responsible for releasing an ACE_Message_Block if + // failures occur. + event->release (); + + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("transmission failure"))); + break; + } + case 0: // Partial Send - we got flow controlled by the receiver + { + ACE_ASSERT (errno == EWOULDBLOCK); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Partial Send due to flow control") + ACE_TEXT ("- scheduling new wakeup with reactor\n"))); + + // Re-schedule a wakeup call from the reactor when the + // flow control conditions abate. + if (ACE_Reactor::instance ()->schedule_wakeup + (this, + ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")), + -1); + + // Didn't write everything this time, come back later... + return 0; + } + default: // Sent the whole thing + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Sent message from message Q, Q size = %d\n"), + this->msg_queue()->message_count ())); + break; + } + } + } + + // If we drop out of the while loop, then the message Q should be + // empty...or there's a problem in the dequeue_head() call...but + // thats another story. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Sent all messages from consumers message Q\n"))); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), + this->get_handle (), + this->connection_id ())); +#else /* !defined (ACE_WIN32) */ + // The list had better not be empty, otherwise there's a bug! if (this->msg_queue ()->dequeue_head (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) { @@ -146,6 +224,9 @@ Consumer_Handler::handle_output (ACE_HANDLE) { case 0: // Partial send. ACE_ASSERT (errno == EWOULDBLOCK); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("%D Partial Send\n"))); + // Didn't write everything this time, come back later... break; @@ -154,8 +235,8 @@ Consumer_Handler::handle_output (ACE_HANDLE) // failures occur. event->release (); ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "transmission failure")); + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("transmission failure"))); /* FALLTHROUGH */ default: // Sent the whole thing. @@ -166,25 +247,29 @@ Consumer_Handler::handle_output (ACE_HANDLE) // ACE_Reactor not to notify us anymore (at least until // there are new events queued up). + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("QQQ::Sent Message from consumer's Q\n"))); + if (this->msg_queue ()->is_empty ()) { ACE_DEBUG ((LM_DEBUG, - "(%t) queueing deactivated on handle %d to routing id %d\n", + ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"), this->get_handle (), this->connection_id ())); if (ACE_Reactor::instance ()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "cancel_wakeup")); + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("cancel_wakeup"))); } } } else ACE_ERROR ((LM_ERROR, - "(%t) %p\n", - "dequeue_head")); + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q"))); +#endif /* ACE_WIN32 */ return 0; } |