summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/Socket.cpp
diff options
context:
space:
mode:
authorOssama Othman <ossama-othman@users.noreply.github.com>2005-02-08 08:33:44 +0000
committerOssama Othman <ossama-othman@users.noreply.github.com>2005-02-08 08:33:44 +0000
commitab82f89dc8fde2725888b2577b6c44f113d3040f (patch)
tree8a9759f3be81fe4eba3a3d42e82609af907c60a4 /protocols/ace/RMCast/Socket.cpp
parent1b01c3c7544d1454ced15a8eae026f96758102a8 (diff)
downloadATCD-unlabeled-1.1.2.tar.gz
ChangeLogTag:Tue Feb 8 00:22:48 2005 Ossama Othman <ossama@dre.vanderbilt.edu>unlabeled-1.1.2
Diffstat (limited to 'protocols/ace/RMCast/Socket.cpp')
-rw-r--r--protocols/ace/RMCast/Socket.cpp112
1 files changed, 112 insertions, 0 deletions
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
new file mode 100644
index 00000000000..640a1a7d694
--- /dev/null
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -0,0 +1,112 @@
+// file : ace/RMCast/Socket.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include <ace/OS.h>
+
+#include <ace/RMCast/Socket.h>
+
+namespace ACE_RMCast
+{
+ Socket::
+ Socket (Address const& a, bool loop)
+ : loop_ (loop), sn_ (1), cond_ (mutex_)
+ {
+ acknowledge_ = auto_ptr<Acknowledge> (new Acknowledge ());
+ retransmit_ = auto_ptr<Retransmit> (new Retransmit ());
+ simulator_ = auto_ptr<Simulator> (new Simulator ());
+ link_ = auto_ptr<Link> (new Link (a));
+
+ // Start IN stack from top to bottom.
+ //
+ in_start (0);
+ acknowledge_->in_start (this);
+ retransmit_->in_start (acknowledge_.get ());
+ simulator_->in_start (retransmit_.get ());
+ link_->in_start (simulator_.get ());
+
+ // Start OUT stack from bottom up.
+ //
+ link_->out_start (0);
+ simulator_->out_start (link_.get ());
+ retransmit_->out_start (simulator_.get ());
+ acknowledge_->out_start (retransmit_.get ());
+ out_start (acknowledge_.get ());
+ }
+
+ Socket::
+ ~Socket ()
+ {
+ // Stop OUT stack from top to bottom.
+ //
+ out_stop ();
+ acknowledge_->out_stop ();
+ retransmit_->out_stop ();
+ simulator_->out_stop ();
+ link_->out_stop ();
+
+ // Stop IN stack from bottom up.
+ //
+ link_->in_stop ();
+ simulator_->in_stop ();
+ retransmit_->in_stop ();
+ acknowledge_->in_stop ();
+ in_stop ();
+ }
+
+
+ void Socket::
+ send (void const* buf, size_t s)
+ {
+ Message_ptr m (new Message);
+
+ m->add (Profile_ptr (new SN (sn_++)));
+ m->add (Profile_ptr (new Data (buf, s)));
+
+ send (m);
+ }
+
+ size_t Socket::
+ recv (void* buf, size_t s)
+ {
+ Lock l (mutex_);
+
+ while (queue_.is_empty ()) cond_.wait ();
+
+ Message_ptr m;
+ if (queue_.dequeue_head (m) == -1) abort ();
+
+ Data const* d (static_cast<Data const*>(m->find (Data::id)));
+
+ size_t r (d->size () < s ? d->size () : s);
+
+ ACE_OS::memcpy (buf, d->buf (), r);
+
+ return r;
+ }
+
+ void Socket::
+ recv (Message_ptr m)
+ {
+ if (m->find (Data::id) != 0)
+ {
+ if (!loop_)
+ {
+ Address to (static_cast<To const*> (m->find (To::id))->address ());
+
+ Address from (
+ static_cast<From const*> (m->find (From::id))->address ());
+
+ if (to == from) return;
+ }
+
+ Lock l (mutex_);
+
+ bool signal (queue_.is_empty ());
+
+ queue_.enqueue_tail (m);
+
+ if (signal) cond_.signal ();
+ }
+ }
+}