ACE Tutorial 015
Building a protocol stream


The Protocol_Task implementation takes care of the open(), close(), put() and svc() methods so that derivatives can concentrate on the send() and recv() methods. After a while you find that most ACE_Task<> derivatives look very similar in the four basic methods and only need one or two additional to do any real work.

// $Id$

#include "Protocol_Task.h"

// Construct the object and remember the thread count.
Protocol_Task::Protocol_Task( int _thr_count )
        : desired_thr_count_(_thr_count)
{
}

Protocol_Task::~Protocol_Task(void)
{
}

// Activate the object if necessary.
int Protocol_Task::open(void *arg) 
{
  ACE_UNUSED_ARG(arg);

  if( desired_thr_count_ )
  {
      return this->activate(THR_NEW_LWP, desired_thr_count_);
  }

  return(0);
}

/* When we're being closed by the ACE_Stream and we've got threads to
   worry about then we drop a hangup message onto the message queue so 
   that svc() will go away.  Except for the call to is_active(), this
   is lifted directly from Tutorial 14.
*/
int Protocol_Task::close(u_long flags)
{
    if (flags == 1 && is_active() )
    {
        ACE_Message_Block *hangupBlock = new ACE_Message_Block();

        hangupBlock->msg_type(ACE_Message_Block::MB_HANGUP);
    
        if (this->putq(hangupBlock->duplicate()) == -1) {
            ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Task::close() putq"), -1);
        }
    
        hangupBlock->release();

        return this->wait();
    }

    return 0;
}

/* The put() method has to make a decision.  If we've got threads then 
   put the unit of work onto the message queue for svc() to deal
   with.  If not then process() it directly.
*/
int Protocol_Task::put(ACE_Message_Block *message,ACE_Time_Value *timeout)
{
    if( is_active() )
    {
        return this->putq(message,timeout);
    }

    return this->process(message,timeout);
}

/* svc() is about what you would expect.  This is again lifted
   directly from Tutorial 14 but with a call to process() for handling 
   the logic instead of doing the work right here.
 */
int Protocol_Task::svc(void)
{
    ACE_Message_Block * message;
    
    while (1)
    {
            // Get a message
        if ( this->getq(message, 0) == -1) {
            ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Protocol_Task::svc() getq"), -1);
        }

        ACE_DEBUG ((LM_DEBUG, "(%P|%t) Protocol_Task::svc() got message\n"));

            // Check for hangup
        if (message->msg_type() == ACE_Message_Block::MB_HANGUP) {
 
            ACE_DEBUG ((LM_DEBUG, "(%P|%t) Protocol_Task::svc() -- HANGUP block received\n"));

                // Hangup our thread-pool peers (if any)
            if (this->putq(message->duplicate()) == -1) {
                ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "Protocol_Task::svc() putq"), -1);
            }

                // Leave svc()
            break;
        }

            // Do some work on the data.
        if( this->process(message->duplicate(),0) == -1 )
        {
            break;
        }

            // Give up the message block before we go get another.
        message->release();
    }

        // Give up the message block that caused us to exit the
        // while(1) loop.
    message->release();
    
    return(0);
}

/* There's nothing really magic about process().  We just decide if
   we're moving data upstream or downstream and invoke the appropriate 
   virtual function to handle it.
*/
int Protocol_Task::process(ACE_Message_Block * message, ACE_Time_Value *timeout)
{
    if( this->is_writer() )
    {
        return this->send(message,timeout);
    }
    
    return this->recv(message,timeout);
}

/* We must insist that derivatives provide a meaningful overload for
   these methods.  It's fairly common for ACE object methods to return 
   an error when an overload is expected but the method cannot be
   safely made pure virtual.
 */
 
int Protocol_Task::send(ACE_Message_Block *message,
                        ACE_Time_Value *timeout)
{
    return -1;
}

int Protocol_Task::recv(ACE_Message_Block * message,
                        ACE_Time_Value *timeout)
{
    return -1;
}


[Tutorial Index] [Continue This Tutorial]