summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2000-04-15 23:28:56 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2000-04-15 23:28:56 +0000
commit12a91dc625226b9a6c384df76c35cea4d3c8f67c (patch)
tree2416601912cdb3d96b8a3a2f18f2f9d8596d1ca9 /apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
parent54a8460f2742750dbdb993c37aefb13c2739eca2 (diff)
downloadATCD-12a91dc625226b9a6c384df76c35cea4d3c8f67c.tar.gz
ChangeLogTag:Sat Apr 15 18:17:02 2000 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
Diffstat (limited to 'apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp')
-rw-r--r--apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp101
1 files changed, 93 insertions, 8 deletions
diff --git a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
index 7a50650d842..ca17494a402 100644
--- a/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
+++ b/apps/Gateway/Gateway/Concrete_Connection_Handlers.cpp
@@ -134,11 +134,89 @@ Consumer_Handler::handle_output (ACE_HANDLE)
ACE_Message_Block *event = 0;
ACE_DEBUG ((LM_DEBUG,
- "(%t) in handle_output on handle %d\n",
+ ACE_TEXT("(%t) Receiver signalled 'resume transmission' %d\n"),
this->get_handle ()));
+ // WIN32 Notes: When the receiver blocked, we started adding to the
+ // consumer handler's message Q. At this time, we registered a
+ // callback with the reactor to tell us when the TCP layer signalled
+ // that we could continue to send messages to the consumer. However,
+ // Winsock only sends this notification ONCE, so we have to assume
+ // at the application level, that we can continue to send until we
+ // get any subsequent blocking signals from the receiver's buffer.
+
+#if defined (ACE_WIN32)
+ // Win32 Winsock doesn't trigger multiple "You can write now"
+ // signals, so we have to assume that we can continue to write until
+ // we get another EWOULDBLOCK.
+
+ // We cancel the wakeup callback we set earlier.
+ if (ACE_Reactor::instance ()->cancel_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Error in ACE_Reactor::cancel_wakeup()")),
+ -1);
+
// The list had better not be empty, otherwise there's a bug!
+ while (this->msg_queue ()->dequeue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (event))
+ {
+ case -1: // Error sending message to consumer.
+ {
+ // We are responsible for releasing an ACE_Message_Block if
+ // failures occur.
+ event->release ();
+
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("transmission failure")));
+ break;
+ }
+ case 0: // Partial Send - we got flow controlled by the receiver
+ {
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%D Partial Send due to flow control")
+ ACE_TEXT ("- scheduling new wakeup with reactor\n")));
+
+ // Re-schedule a wakeup call from the reactor when the
+ // flow control conditions abate.
+ if (ACE_Reactor::instance ()->schedule_wakeup
+ (this,
+ ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Error in ACE_Reactor::schedule_wakeup()")),
+ -1);
+
+ // Didn't write everything this time, come back later...
+ return 0;
+ }
+ default: // Sent the whole thing
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Sent message from message Q, Q size = %d\n"),
+ this->msg_queue()->message_count ()));
+ break;
+ }
+ }
+ }
+
+ // If we drop out of the while loop, then the message Q should be
+ // empty...or there's a problem in the dequeue_head() call...but
+ // thats another story.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%D Sent all messages from consumers message Q\n")));
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
+ this->get_handle (),
+ this->connection_id ()));
+#else /* !defined (ACE_WIN32) */
+ // The list had better not be empty, otherwise there's a bug!
if (this->msg_queue ()->dequeue_head
(event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
{
@@ -146,6 +224,9 @@ Consumer_Handler::handle_output (ACE_HANDLE)
{
case 0: // Partial send.
ACE_ASSERT (errno == EWOULDBLOCK);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("%D Partial Send\n")));
+
// Didn't write everything this time, come back later...
break;
@@ -154,8 +235,8 @@ Consumer_Handler::handle_output (ACE_HANDLE)
// failures occur.
event->release ();
ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "transmission failure"));
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("transmission failure")));
/* FALLTHROUGH */
default: // Sent the whole thing.
@@ -166,25 +247,29 @@ Consumer_Handler::handle_output (ACE_HANDLE)
// ACE_Reactor not to notify us anymore (at least until
// there are new events queued up).
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("QQQ::Sent Message from consumer's Q\n")));
+
if (this->msg_queue ()->is_empty ())
{
ACE_DEBUG ((LM_DEBUG,
- "(%t) queueing deactivated on handle %d to routing id %d\n",
+ ACE_TEXT ("(%t) queueing deactivated on handle %d to routing id %d\n"),
this->get_handle (),
this->connection_id ()));
if (ACE_Reactor::instance ()->cancel_wakeup
(this, ACE_Event_Handler::WRITE_MASK) == -1)
ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "cancel_wakeup"));
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("cancel_wakeup")));
}
}
}
else
ACE_ERROR ((LM_ERROR,
- "(%t) %p\n",
- "dequeue_head"));
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("dequeue_head - handle_output called by reactor but nothing in Q")));
+#endif /* ACE_WIN32 */
return 0;
}