summaryrefslogtreecommitdiff
path: root/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'apps/Gateway/Gateway/Thr_Proxy_Handler.cpp')
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.cpp157
1 files changed, 82 insertions, 75 deletions
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
index 98722a96295..f316e4e82bf 100644
--- a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
+++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
@@ -1,30 +1,33 @@
-#include "Thr_Proxy_Handler.h"
// $Id$
-#include "Proxy_Handler_Connector.h"
+#include "Event_Channel.h"
+#include "Thr_Proxy_Handler.h"
#if defined (ACE_HAS_THREADS)
-Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size)
+Thr_Consumer_Proxy::Thr_Consumer_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : Consumer_Proxy (ec, remote_addr, local_addr, conn_id)
{
}
-// This method should be called only when the peer shuts down
-// unexpectedly. This method marks the Proxy_Handler as having failed and
-// deactivates the ACE_Message_Queue (to wake up the thread blocked on
-// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
-// eventually try to reconnect...
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method marks the Proxy_Handler as having failed
+// and deactivates the ACE_Message_Queue (to wake up the thread
+// blocked on <dequeue_head> in svc()).
+// Thr_Output_Handler::handle_close () will eventually try to
+// reconnect...
int
Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
{
+ // Call down to the <Consumer_Proxy> to handle this first.
this->Consumer_Proxy::handle_input (h);
- ACE_Service_Config::reactor ()->remove_handler (h,
- ACE_Event_Handler::RWE_MASK
- | ACE_Event_Handler::DONT_CALL);
+
+ ACE_Service_Config::reactor ()->remove_handler
+ (h, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL);
+
// Deactivate the queue while we try to get reconnected.
this->msg_queue ()->deactivate ();
return 0;
@@ -36,31 +39,28 @@ Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
int
Thr_Consumer_Proxy::open (void *)
{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
+
// Register ourselves to receive input events (which indicate that
- // the Peer has shut down unexpectedly).
- if (ACE_Service_Config::reactor ()->register_handler (this,
- ACE_Event_Handler::READ_MASK) == -1)
+ // the Consumer has shut down unexpectedly).
+ else if (ACE_Service_Config::reactor ()->register_handler
+ (this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
-
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
{
ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
- // messages to peers.
+ // events to Consumers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
@@ -70,87 +70,93 @@ Thr_Consumer_Proxy::open (void *)
}
}
-// ACE_Queue up a message for transmission (must not block since all
-// Supplier_Proxys are single-threaded).
+// Queue up an event for transmission (must not block since
+// Supplier_Proxys may be single-threaded).
int
Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
{
// Perform non-blocking enqueue.
- return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+ return this->msg_queue ()->enqueue_tail
+ (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
}
-// Transmit messages to the peer (note simplification resulting from
+// Transmit events to the peer (note simplification resulting from
// threads...)
int
Thr_Consumer_Proxy::svc (void)
{
+
for (;;)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connected! Thr_Consumer_Proxy's handle = %d\n",
this->peer ().get_handle ()));
// Since this method runs in its own thread it is OK to block on
// output.
for (ACE_Message_Block *mb = 0;
- this->msg_queue ()->dequeue_head (mb) != -1; )
- if (this->send (mb) == -1)
- ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
-
- ACE_ASSERT (errno == ESHUTDOWN);
-
- ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
- this->id (), this->get_handle ()));
-
- this->peer ().close ();
-
- for (this->timeout (1);
- // Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
- {
- ACE_Time_Value tv (this->timeout ());
- ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n",
- tv.sec ()));
- ACE_OS::sleep (tv);
- }
+ this->msg_queue ()->dequeue_head (mb) != -1;
+ )
+ {
+ if (this->send (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
+ }
+
+ ACE_ASSERT (errno == ESHUTDOWN);
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ this->peer ().close ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->event_channel_.initiate_proxy_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+
+ ACE_OS::sleep (tv);
+ }
}
return 0;
}
-Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd,
- Proxy_Handler_Connector *ioc,
- ACE_Thread_Manager *thr_mgr,
- int socket_queue_size)
- : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size)
+Thr_Supplier_Proxy::Thr_Supplier_Proxy (ACE_Event_Channel &ec,
+ const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 conn_id)
+ : Supplier_Proxy (ec, remote_addr, local_addr, conn_id)
{
}
int
Thr_Supplier_Proxy::open (void *)
{
- // Set the size of the socket queue.
- this->socket_queue_size ();
-
// Turn off non-blocking I/O.
if (this->peer ().disable (ACE_NONBLOCK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
- if (this->initialize_connection ())
- ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
- "initialize_connection"), -1);
+ // Call back to the <Event_Channel> to complete our initialization.
+ else if (this->event_channel_.complete_proxy_connection (this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "complete_proxy_connection"), -1);
// Reactivate message queue. If it was active then this is the
// first time in and we need to spawn a thread, otherwise the queue
// was inactive due to some problem and we've already got a thread.
- if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ else if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
{
ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
// Become an active object by spawning a new thread to transmit
- // messages to peers.
+ // events to peers.
return this->activate (THR_NEW_LWP | THR_DETACHED);
}
else
@@ -160,7 +166,7 @@ Thr_Supplier_Proxy::open (void *)
}
}
-// Receive messages from a Peer in a separate thread (note reuse of
+// Receive events from a Peer in a separate thread (note reuse of
// existing code!).
int
@@ -168,20 +174,20 @@ Thr_Supplier_Proxy::svc (void)
{
for (;;)
{
- ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n",
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) connected! Thr_Supplier_Proxy's handle = %d\n",
this->peer ().get_handle ()));
- // Since this method runs in its own thread and processes
- // messages for one connection it is OK to block on input and
- // output.
+ // Since this method runs in its own thread and processes events
+ // for one connection it is OK to call down to the
+ // <Supplier_Proxy::handle_input> method, which blocks on input.
while (this->handle_input () != -1)
continue;
ACE_DEBUG ((LM_DEBUG,
"(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
- this->id (),
- this->get_handle ()));
+ this->id (), this->get_handle ()));
this->peer ().close ();
@@ -190,11 +196,12 @@ Thr_Supplier_Proxy::svc (void)
for (this->timeout (1);
// Default is to reconnect synchronously.
- this->connector_->initiate_connection (this) == -1; )
+ this->event_channel_.initiate_proxy_connection (this) == -1; )
{
ACE_Time_Value tv (this->timeout ());
ACE_ERROR ((LM_ERROR,
- "(%t) reattempting connection, sec = %d\n", tv.sec ()));
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
ACE_OS::sleep (tv);
}
}