summaryrefslogtreecommitdiff
path: root/ACE/ace/Stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/ace/Stream.h')
-rw-r--r--ACE/ace/Stream.h241
1 files changed, 241 insertions, 0 deletions
diff --git a/ACE/ace/Stream.h b/ACE/ace/Stream.h
new file mode 100644
index 00000000000..1ab1cacf3ea
--- /dev/null
+++ b/ACE/ace/Stream.h
@@ -0,0 +1,241 @@
+// -*- C++ -*-
+
+//==========================================================================
+/**
+ * @file Stream.h
+ *
+ * $Id$
+ *
+ * @author Douglas C. Schmidt <schmidt@uci.edu>
+ */
+//==========================================================================
+
+#ifndef ACE_STREAM_H
+#define ACE_STREAM_H
+
+#include /**/ "ace/pre.h"
+
+#include "ace/config-all.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/IO_Cntl_Msg.h"
+#include "ace/Message_Block.h"
+#include "ace/Module.h"
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+// Forward decls.
+template<ACE_SYNCH_DECL> class ACE_Stream_Iterator;
+class ACE_Time_Value;
+
+/**
+ * @class ACE_Stream
+ *
+ * @brief This class is the primary abstraction for the ASX framework.
+ * It is moduled after System V Stream.
+ *
+ * A Stream consists of a stack of <ACE_Modules>, each of which
+ * contains two <ACE_Tasks>. Even though the methods in this
+ * class are virtual, this class isn't really intended for
+ * subclassing unless you know what you are doing. In
+ * particular, the <ACE_Stream> destructor calls <close>, which
+ * won't be overridden properly unless you call it in a subclass
+ * destructor.
+ */
+template <ACE_SYNCH_DECL>
+class ACE_Stream
+{
+public:
+ friend class ACE_Stream_Iterator<ACE_SYNCH_USE>;
+
+ enum
+ {
+ /// Indicates that <close> deletes the Tasks. Don't change this
+ /// value without updating the same enum in class ACE_Module...
+ M_DELETE = 3
+ };
+
+ // = Initializatation and termination methods.
+ /**
+ * Create a Stream consisting of <head> and <tail> as the Stream
+ * head and Stream tail, respectively. If these are 0 then the
+ * <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
+ * <arg> is the value past in to the <open> methods of the tasks.
+ */
+ ACE_Stream (void *arg = 0,
+ ACE_Module<ACE_SYNCH_USE> *head = 0,
+ ACE_Module<ACE_SYNCH_USE> *tail = 0);
+
+ /**
+ * Create a Stream consisting of <head> and <tail> as the Stream
+ * head and Stream tail, respectively. If these are 0 then the
+ * <ACE_Stream_Head> and <ACE_Stream_Tail> are used, respectively.
+ * <arg> is the value past in to the <open> methods of the tasks.
+ */
+ virtual int open (void *arg,
+ ACE_Module<ACE_SYNCH_USE> *head = 0,
+ ACE_Module<ACE_SYNCH_USE> *tail = 0);
+
+ /// Close down the stream and release all the resources.
+ virtual int close (int flags = M_DELETE);
+
+ /// Close down the stream and release all the resources.
+ virtual ~ACE_Stream (void);
+
+ // = ACE_Stream plumbing operations
+
+ /// Add a new module <mod> right below the Stream head. The
+ /// <open()> hook methods of the <ACE_Tasks> in this <ACE_Module>
+ /// are invoked to initialize the tasks.
+ virtual int push (ACE_Module<ACE_SYNCH_USE> *mod);
+
+ /// Remove the <mod> right below the Stream head and close it down.
+ // The <close()> hook methods of the <ACE_Tasks> in this <ACE_Module>
+ /// are invoked to cleanup the tasks.
+ virtual int pop (int flags = M_DELETE);
+
+ /// Return the top module on the stream (right below the stream
+ /// head).
+ virtual int top (ACE_Module<ACE_SYNCH_USE> *&mod);
+
+ /// Insert a new module <mod> below the named module <prev_name>.
+ virtual int insert (const ACE_TCHAR *prev_name,
+ ACE_Module<ACE_SYNCH_USE> *mod);
+
+ /// Replace the named module <replace_name> with a new module <mod>.
+ virtual int replace (const ACE_TCHAR *replace_name,
+ ACE_Module<ACE_SYNCH_USE> *mod,
+ int flags = M_DELETE);
+
+ /// Remove the named module <mod> from the stream. This bypasses the
+ /// strict LIFO ordering of <push> and <pop>.
+ virtual int remove (const ACE_TCHAR *mod,
+ int flags = M_DELETE);
+
+ /// Return current stream head.
+ virtual ACE_Module<ACE_SYNCH_USE> *head (void);
+
+ /// Return current stream tail.
+ virtual ACE_Module<ACE_SYNCH_USE> *tail (void);
+
+ /// Find a particular ACE_Module.
+ virtual ACE_Module<ACE_SYNCH_USE> *find (const ACE_TCHAR *mod);
+
+ /// Create a pipe between two Streams.
+ virtual int link (ACE_Stream<ACE_SYNCH_USE> &);
+
+ /// Remove a pipe formed between two Streams.
+ virtual int unlink (void);
+
+ // = Blocking data transfer operations
+ /**
+ * Send the message <mb> down the stream, starting at the Module
+ * below the Stream head. Wait for upto <timeout> amount of
+ * absolute time for the operation to complete (or block forever if
+ * <timeout> == 0).
+ */
+ virtual int put (ACE_Message_Block *mb,
+ ACE_Time_Value *timeout = 0);
+
+ /**
+ * Read the message <mb> that is stored in the stream head.
+ * Wait for upto <timeout> amount of absolute time for the operation
+ * to complete (or block forever if <timeout> == 0).
+ */
+ virtual int get (ACE_Message_Block *&mb,
+ ACE_Time_Value *timeout = 0);
+
+ /// Send control message down the stream.
+ virtual int control (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds cmd,
+ void *args);
+
+ /// Synchronize with the final close of the stream.
+ virtual int wait (void);
+
+ /// Dump the state of an object.
+ virtual void dump (void) const;
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+private:
+ /// Actually perform the unlinking of two Streams (must be called
+ /// with locks held).
+ int unlink_i (void);
+
+ /// Actually perform the linking of two Streams (must be called with
+ /// locks held).
+ int link_i (ACE_Stream<ACE_SYNCH_USE> &);
+
+ /// Must a new module onto the Stream.
+ int push_module (ACE_Module<ACE_SYNCH_USE> *,
+ ACE_Module<ACE_SYNCH_USE> * = 0,
+ ACE_Module<ACE_SYNCH_USE> * = 0);
+
+ /// Pointer to the head of the stream.
+ ACE_Module<ACE_SYNCH_USE> *stream_head_;
+
+ /// Pointer to the tail of the stream.
+ ACE_Module<ACE_SYNCH_USE> *stream_tail_;
+
+ /// Pointer to an adjoining linked stream.
+ ACE_Stream<ACE_SYNCH_USE> *linked_us_;
+
+ // = Synchronization objects used for thread-safe streams.
+ /// Protect the stream against race conditions.
+ ACE_SYNCH_MUTEX_T lock_;
+
+ /// Use to tell all threads waiting on the close that we are done.
+ ACE_SYNCH_CONDITION_T final_close_;
+};
+
+/**
+ * @class ACE_Stream_Iterator
+ *
+ * @brief Iterate through an <ACE_Stream>.
+ */
+template <ACE_SYNCH_DECL>
+class ACE_Stream_Iterator
+{
+public:
+ // = Initialization method.
+ ACE_Stream_Iterator (const ACE_Stream<ACE_SYNCH_USE> &sr);
+
+ // = Iteration methods.
+
+ /// Pass back the <next_item> that hasn't been seen in the set.
+ /// Returns 0 when all items have been seen, else 1.
+ int next (const ACE_Module<ACE_SYNCH_USE> *&next_item);
+
+ /// Returns 1 when all items have been seen, else 0.
+ int done (void) const;
+
+ /// Move forward by one element in the set. Returns 0 when all the
+ /// items in the set have been seen, else 1.
+ int advance (void);
+
+private:
+ /// Next <Module> that we haven't yet seen.
+ ACE_Module<ACE_SYNCH_USE> *next_;
+};
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+#include "ace/Stream.inl"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ace/Stream.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Stream.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#include /**/ "ace/post.h"
+
+#endif /* ACE_STREAM_H */