diff options
author | Allan Sandfeld Jensen <allan.jensen@theqtcompany.com> | 2016-08-25 10:44:03 +0200 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2016-08-25 09:40:07 +0000 |
commit | e20ba3c57b50674f625b5088faa0fe9a076c0617 (patch) | |
tree | 3006142b83866a52a56d34ade8446d5044647305 /chromium/mojo | |
parent | 28b1110370900897ab652cb420c371fab8857ad4 (diff) | |
download | qtwebengine-chromium-e20ba3c57b50674f625b5088faa0fe9a076c0617.tar.gz |
BASELINE: Update Chromium to 53.0.2785.80
Also adds 3rdparty libraries under pdfium.
Change-Id: I29afb23f1642fa55765d056697d5d145afa22bb2
Reviewed-by: Michael BrĂ¼ning <michael.bruning@qt.io>
Diffstat (limited to 'chromium/mojo')
-rw-r--r-- | chromium/mojo/edk/system/node_controller.cc | 3 | ||||
-rw-r--r-- | chromium/mojo/edk/system/ports/node.cc | 17 | ||||
-rw-r--r-- | chromium/mojo/edk/system/ports/node.h | 13 | ||||
-rw-r--r-- | chromium/mojo/edk/system/ports/ports_unittest.cc | 1667 |
4 files changed, 828 insertions, 872 deletions
diff --git a/chromium/mojo/edk/system/node_controller.cc b/chromium/mojo/edk/system/node_controller.cc index 5fc55f557d7..f40a0cf37d6 100644 --- a/chromium/mojo/edk/system/node_controller.cc +++ b/chromium/mojo/edk/system/node_controller.cc @@ -1193,7 +1193,8 @@ void NodeController::AttemptShutdownIfRequested() { base::AutoLock lock(shutdown_lock_); if (shutdown_callback_.is_null()) return; - if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { + if (!node_->CanShutdownCleanly( + ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) { DVLOG(2) << "Unable to cleanly shut down node " << name_; return; } diff --git a/chromium/mojo/edk/system/ports/node.cc b/chromium/mojo/edk/system/ports/node.cc index 128ecdf4ca7..c7f42f6f8b3 100644 --- a/chromium/mojo/edk/system/ports/node.cc +++ b/chromium/mojo/edk/system/ports/node.cc @@ -64,10 +64,10 @@ Node::~Node() { DLOG(WARNING) << "Unclean shutdown for node " << name_; } -bool Node::CanShutdownCleanly(bool allow_local_ports) { +bool Node::CanShutdownCleanly(ShutdownPolicy policy) { base::AutoLock ports_lock(ports_lock_); - if (!allow_local_ports) { + if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) { #if DCHECK_IS_ON() for (auto entry : ports_) { DVLOG(2) << "Port " << entry.first << " referencing node " @@ -78,6 +78,8 @@ bool Node::CanShutdownCleanly(bool allow_local_ports) { return ports_.empty(); } + DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS); + // NOTE: This is not efficient, though it probably doesn't need to be since // relatively few ports should be open during shutdown and shutdown doesn't // need to be blazingly fast. @@ -531,17 +533,6 @@ int Node::OnObserveProxy(const PortName& port_name, scoped_refptr<Port> port = GetPort(port_name); if (!port) { DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; - - if (port_name != event.proxy_port_name && - port_name != event.proxy_to_port_name) { - // The receiving port may have been removed while this message was in - // transit. In this case, we restart the ObserveProxy circulation from - // the referenced proxy port to avoid leaking the proxy. - delegate_->ForwardMessage( - event.proxy_node_name, - NewInternalMessage( - event.proxy_port_name, EventType::kObserveProxy, event)); - } return OK; } diff --git a/chromium/mojo/edk/system/ports/node.h b/chromium/mojo/edk/system/ports/node.h index 3aeadcaeb40..65252a3af71 100644 --- a/chromium/mojo/edk/system/ports/node.h +++ b/chromium/mojo/edk/system/ports/node.h @@ -48,6 +48,11 @@ class NodeDelegate; class Node { public: + enum class ShutdownPolicy { + DONT_ALLOW_LOCAL_PORTS, + ALLOW_LOCAL_PORTS, + }; + // Does not take ownership of the delegate. Node(const NodeName& name, NodeDelegate* delegate); ~Node(); @@ -59,9 +64,11 @@ class Node { // method may be called again after AcceptMessage to check if the Node is now // ready to be destroyed. // - // If |allow_local_ports| is |true|, this will only return |false| when there - // are transient ports referring to other nodes. - bool CanShutdownCleanly(bool allow_local_ports); + // If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return + // |true| even if some ports remain alive, as long as none of them are proxies + // to another node. + bool CanShutdownCleanly( + ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS); // Lookup the named port. int GetPort(const PortName& port_name, PortRef* port_ref); diff --git a/chromium/mojo/edk/system/ports/ports_unittest.cc b/chromium/mojo/edk/system/ports/ports_unittest.cc index 200e72bb1b6..b76fa10de5f 100644 --- a/chromium/mojo/edk/system/ports/ports_unittest.cc +++ b/chromium/mojo/edk/system/ports/ports_unittest.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -9,9 +10,18 @@ #include <map> #include <queue> #include <sstream> +#include <utility> +#include "base/bind.h" +#include "base/callback.h" #include "base/logging.h" +#include "base/memory/ref_counted.h" #include "base/rand_util.h" +#include "base/strings/string_piece.h" +#include "base/strings/stringprintf.h" +#include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/thread.h" #include "mojo/edk/system/ports/event.h" #include "mojo/edk/system/ports/node.h" #include "mojo/edk/system/ports/node_delegate.h" @@ -24,24 +34,8 @@ namespace test { namespace { -void LogMessage(const Message* message) { - std::stringstream ports; - for (size_t i = 0; i < message->num_ports(); ++i) { - if (i > 0) - ports << ","; - ports << message->ports()[i]; - } - DVLOG(1) << "message: \"" - << static_cast<const char*>(message->payload_bytes()) - << "\" ports=[" << ports.str() << "]"; -} - -void ClosePortsInMessage(Node* node, Message* message) { - for (size_t i = 0; i < message->num_ports(); ++i) { - PortRef port; - ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port)); - EXPECT_EQ(OK, node->ClosePort(port)); - } +bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) { + return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data()); } class TestMessage : public Message { @@ -71,132 +65,140 @@ class TestMessage : public Message { } }; -struct Task { - Task(NodeName node_name, ScopedMessage message) - : node_name(node_name), - message(std::move(message)), - priority(base::RandUint64()) { - } +class TestNode; - NodeName node_name; - ScopedMessage message; - uint64_t priority; +class MessageRouter { + public: + virtual ~MessageRouter() {} + + virtual void GeneratePortName(PortName* name) = 0; + virtual void ForwardMessage(TestNode* from_node, + const NodeName& node_name, + ScopedMessage message) = 0; + virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0; }; -struct TaskComparator { - bool operator()(const Task* a, const Task* b) { - return a->priority < b->priority; +class TestNode : public NodeDelegate { + public: + explicit TestNode(uint64_t id) + : node_name_(id, 1), + node_(node_name_, this), + node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)), + messages_available_event_( + base::WaitableEvent::ResetPolicy::AUTOMATIC, + base::WaitableEvent::InitialState::NOT_SIGNALED), + idle_event_( + base::WaitableEvent::ResetPolicy::MANUAL, + base::WaitableEvent::InitialState::SIGNALED) { } -}; -const size_t kMaxNodes = 3; + ~TestNode() override { + StopWhenIdle(); + node_thread_.Stop(); + } -std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue; -Node* node_map[kMaxNodes]; + const NodeName& name() const { return node_name_; } -Node* GetNode(const NodeName& name) { - return node_map[name.v1]; -} + // NOTE: Node is thread-safe. + Node& node() { return node_; } -void SetNode(const NodeName& name, Node* node) { - node_map[name.v1] = node; -} - -void PumpTasks() { - while (!task_queue.empty()) { - Task* task = task_queue.top(); - task_queue.pop(); + base::WaitableEvent& idle_event() { return idle_event_; } - Node* node = GetNode(task->node_name); - if (node) - node->AcceptMessage(std::move(task->message)); + bool IsIdle() { + base::AutoLock lock(lock_); + return started_ && !dispatching_ && + (incoming_messages_.empty() || (block_on_event_ && blocked_)); + } - delete task; + void BlockOnEvent(EventType type) { + base::AutoLock lock(lock_); + blocked_event_type_ = type; + block_on_event_ = true; } -} -void PumpUntilTask(EventType type) { - while (!task_queue.empty()) { - Task* task = task_queue.top(); + void Unblock() { + base::AutoLock lock(lock_); + block_on_event_ = false; + messages_available_event_.Signal(); + } - const EventHeader* header = GetEventHeader(*task->message); - if (header->type == type) - return; + void Start(MessageRouter* router) { + router_ = router; + node_thread_.Start(); + node_thread_.task_runner()->PostTask( + FROM_HERE, + base::Bind(&TestNode::ProcessMessages, base::Unretained(this))); + } - task_queue.pop(); + void StopWhenIdle() { + base::AutoLock lock(lock_); + should_quit_ = true; + messages_available_event_.Signal(); + } - Node* node = GetNode(task->node_name); - if (node) - node->AcceptMessage(std::move(task->message)); + void WakeUp() { messages_available_event_.Signal(); } - delete task; + int SendStringMessage(const PortRef& port, const std::string& s) { + size_t size = s.size() + 1; + ScopedMessage message = TestMessage::NewUserMessage(size, 0); + memcpy(message->mutable_payload_bytes(), s.data(), size); + return node_.SendMessage(port, std::move(message)); } -} -void DiscardPendingTasks() { - while (!task_queue.empty()) { - Task* task = task_queue.top(); - task_queue.pop(); - delete task; + int SendStringMessageWithPort(const PortRef& port, + const std::string& s, + const PortName& sent_port_name) { + size_t size = s.size() + 1; + ScopedMessage message = TestMessage::NewUserMessage(size, 1); + memcpy(message->mutable_payload_bytes(), s.data(), size); + message->mutable_ports()[0] = sent_port_name; + return node_.SendMessage(port, std::move(message)); } -} - -int SendStringMessage(Node* node, const PortRef& port, const std::string& s) { - size_t size = s.size() + 1; - ScopedMessage message = TestMessage::NewUserMessage(size, 0); - memcpy(message->mutable_payload_bytes(), s.data(), size); - return node->SendMessage(port, std::move(message)); -} - -int SendStringMessageWithPort(Node* node, - const PortRef& port, - const std::string& s, - const PortName& sent_port_name) { - size_t size = s.size() + 1; - ScopedMessage message = TestMessage::NewUserMessage(size, 1); - memcpy(message->mutable_payload_bytes(), s.data(), size); - message->mutable_ports()[0] = sent_port_name; - return node->SendMessage(port, std::move(message)); -} -int SendStringMessageWithPort(Node* node, - const PortRef& port, - const std::string& s, - const PortRef& sent_port) { - return SendStringMessageWithPort(node, port, s, sent_port.name()); -} + int SendStringMessageWithPort(const PortRef& port, + const std::string& s, + const PortRef& sent_port) { + return SendStringMessageWithPort(port, s, sent_port.name()); + } -const char* ToString(const ScopedMessage& message) { - return static_cast<const char*>(message->payload_bytes()); -} + void set_drop_messages(bool value) { + base::AutoLock lock(lock_); + drop_messages_ = value; + } -class TestNodeDelegate : public NodeDelegate { - public: - explicit TestNodeDelegate(const NodeName& node_name) - : node_name_(node_name), - drop_messages_(false), - read_messages_(true), - save_messages_(false) { + void set_save_messages(bool value) { + base::AutoLock lock(lock_); + save_messages_ = value; } - void set_drop_messages(bool value) { drop_messages_ = value; } - void set_read_messages(bool value) { read_messages_ = value; } - void set_save_messages(bool value) { save_messages_ = value; } + bool ReadMessage(const PortRef& port, ScopedMessage* message) { + return node_.GetMessage(port, message) == OK && *message; + } bool GetSavedMessage(ScopedMessage* message) { + base::AutoLock lock(lock_); if (saved_messages_.empty()) { message->reset(); return false; } - *message = std::move(saved_messages_.front()); + std::swap(*message, saved_messages_.front()); saved_messages_.pop(); return true; } + void EnqueueMessage(ScopedMessage message) { + idle_event_.Reset(); + + // NOTE: This may be called from ForwardMessage and thus must not reenter + // |node_|. + base::AutoLock lock(lock_); + incoming_messages_.emplace(std::move(message)); + messages_available_event_.Signal(); + } + void GenerateRandomPortName(PortName* port_name) override { - static uint64_t next_port_name = 1; - port_name->v1 = next_port_name++; - port_name->v2 = 0; + DCHECK(router_); + router_->GeneratePortName(port_name); } void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override { @@ -205,489 +207,568 @@ class TestNodeDelegate : public NodeDelegate { void ForwardMessage(const NodeName& node_name, ScopedMessage message) override { - if (drop_messages_) { - DVLOG(1) << "Dropping ForwardMessage from node " - << node_name_ << " to " << node_name; - ClosePortsInMessage(GetNode(node_name), message.get()); - return; + { + base::AutoLock lock(lock_); + if (drop_messages_) { + DVLOG(1) << "Dropping ForwardMessage from node " + << node_name_ << " to " << node_name; + + base::AutoUnlock unlock(lock_); + ClosePortsInMessage(message.get()); + return; + } } + + DCHECK(router_); DVLOG(1) << "ForwardMessage from node " << node_name_ << " to " << node_name; - task_queue.push(new Task(node_name, std::move(message))); + router_->ForwardMessage(this, node_name, std::move(message)); } void BroadcastMessage(ScopedMessage message) override { - for (size_t i = 0; i < kMaxNodes; ++i) { - Node* node = node_map[i]; - // Broadcast doesn't deliver to the local node. - if (node && node != GetNode(node_name_)) { - // NOTE: We only need to support broadcast of events, which have no - // payload or ports bytes. - ScopedMessage new_message( - new TestMessage(message->num_header_bytes(), 0, 0)); - memcpy(new_message->mutable_header_bytes(), message->header_bytes(), - message->num_header_bytes()); - node->AcceptMessage(std::move(new_message)); - } - } + router_->BroadcastMessage(this, std::move(message)); } void PortStatusChanged(const PortRef& port) override { - DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_; - if (!read_messages_) + // The port may be closed, in which case we ignore the notification. + base::AutoLock lock(lock_); + if (!save_messages_) return; - Node* node = GetNode(node_name_); + for (;;) { ScopedMessage message; - int rv = node->GetMessage(port, &message); - EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED); - if (rv == ERROR_PORT_PEER_CLOSED || !message) - break; - if (save_messages_) { - SaveMessage(std::move(message)); - } else { - LogMessage(message.get()); - for (size_t i = 0; i < message->num_ports(); ++i) { - std::stringstream buf; - buf << "got port: " << message->ports()[i]; - - PortRef received_port; - node->GetPort(message->ports()[i], &received_port); + { + base::AutoUnlock unlock(lock_); + if (!ReadMessage(port, &message)) + break; + } - SendStringMessage(node, received_port, buf.str()); + saved_messages_.emplace(std::move(message)); + } + } - // Avoid leaking these ports. - node->ClosePort(received_port); - } - } + void ClosePortsInMessage(Message* message) { + for (size_t i = 0; i < message->num_ports(); ++i) { + PortRef port; + ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port)); + EXPECT_EQ(OK, node_.ClosePort(port)); } } private: - void SaveMessage(ScopedMessage message) { - saved_messages_.emplace(std::move(message)); + void ProcessMessages() { + for (;;) { + messages_available_event_.Wait(); + + base::AutoLock lock(lock_); + + if (should_quit_) + return; + + dispatching_ = true; + while (!incoming_messages_.empty()) { + if (block_on_event_ && + GetEventHeader(*incoming_messages_.front())->type == + blocked_event_type_) { + blocked_ = true; + // Go idle if we hit a blocked event type. + break; + } else { + blocked_ = false; + } + ScopedMessage message = std::move(incoming_messages_.front()); + incoming_messages_.pop(); + + // NOTE: AcceptMessage() can re-enter this object to call any of the + // NodeDelegate interface methods. + base::AutoUnlock unlock(lock_); + node_.AcceptMessage(std::move(message)); + } + + dispatching_ = false; + started_ = true; + idle_event_.Signal(); + }; } + const NodeName node_name_; + Node node_; + MessageRouter* router_ = nullptr; + + base::Thread node_thread_; + base::WaitableEvent messages_available_event_; + base::WaitableEvent idle_event_; + + // Guards fields below. + base::Lock lock_; + bool started_ = false; + bool dispatching_ = false; + bool should_quit_ = false; + bool drop_messages_ = false; + bool save_messages_ = false; + bool blocked_ = false; + bool block_on_event_ = false; + EventType blocked_event_type_; + std::queue<ScopedMessage> incoming_messages_; std::queue<ScopedMessage> saved_messages_; - NodeName node_name_; - bool drop_messages_; - bool read_messages_; - bool save_messages_; }; -class PortsTest : public testing::Test { +class PortsTest : public testing::Test, public MessageRouter { public: - void SetUp() override { - DiscardPendingTasks(); - SetNode(NodeName(0, 1), nullptr); - SetNode(NodeName(1, 1), nullptr); - SetNode(NodeName(2, 1), nullptr); + void AddNode(TestNode* node) { + { + base::AutoLock lock(lock_); + nodes_[node->name()] = node; + } + node->Start(this); + } + + void RemoveNode(TestNode* node) { + { + base::AutoLock lock(lock_); + nodes_.erase(node->name()); + } + + for (const auto& entry : nodes_) + entry.second->node().LostConnectionToNode(node->name()); + } + + // Waits until all known Nodes are idle. Message forwarding and processing + // is handled in such a way that idleness is a stable state: once all nodes in + // the system are idle, they will remain idle until the test explicitly + // initiates some further event (e.g. sending a message, closing a port, or + // removing a Node). + void WaitForIdle() { + for (;;) { + base::AutoLock global_lock(global_lock_); + bool all_nodes_idle = true; + for (const auto& entry : nodes_) { + if (!entry.second->IsIdle()) + all_nodes_idle = false; + entry.second->WakeUp(); + } + if (all_nodes_idle) + return; + + // Wait for any Node to signal that it's idle. + base::AutoUnlock global_unlock(global_lock_); + std::vector<base::WaitableEvent*> events; + for (const auto& entry : nodes_) + events.push_back(&entry.second->idle_event()); + base::WaitableEvent::WaitMany(events.data(), events.size()); + } } + + void CreatePortPair(TestNode* node0, + PortRef* port0, + TestNode* node1, + PortRef* port1) { + if (node0 == node1) { + EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1)); + } else { + EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0)); + EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1)); + EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(), + port1->name())); + EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(), + port0->name())); + } + } + + private: + // MessageRouter: + void GeneratePortName(PortName* name) override { + base::AutoLock lock(lock_); + name->v1 = next_port_id_++; + name->v2 = 0; + } + + void ForwardMessage(TestNode* from_node, + const NodeName& node_name, + ScopedMessage message) override { + base::AutoLock global_lock(global_lock_); + base::AutoLock lock(lock_); + // Drop messages from nodes that have been removed. + if (nodes_.find(from_node->name()) == nodes_.end()) { + from_node->ClosePortsInMessage(message.get()); + return; + } + + auto it = nodes_.find(node_name); + if (it == nodes_.end()) { + DVLOG(1) << "Node not found: " << node_name; + return; + } + + it->second->EnqueueMessage(std::move(message)); + } + + void BroadcastMessage(TestNode* from_node, ScopedMessage message) override { + base::AutoLock global_lock(global_lock_); + base::AutoLock lock(lock_); + + // Drop messages from nodes that have been removed. + if (nodes_.find(from_node->name()) == nodes_.end()) + return; + + for (const auto& entry : nodes_) { + TestNode* node = entry.second; + // Broadcast doesn't deliver to the local node. + if (node == from_node) + continue; + + // NOTE: We only need to support broadcast of events. Events have no + // payload or ports bytes. + ScopedMessage new_message( + new TestMessage(message->num_header_bytes(), 0, 0)); + memcpy(new_message->mutable_header_bytes(), message->header_bytes(), + message->num_header_bytes()); + node->EnqueueMessage(std::move(new_message)); + } + } + + base::MessageLoop message_loop_; + + // Acquired before any operation which makes a Node busy, and before testing + // if all nodes are idle. + base::Lock global_lock_; + + base::Lock lock_; + uint64_t next_port_id_ = 1; + std::map<NodeName, TestNode*> nodes_; }; } // namespace TEST_F(PortsTest, Basic1) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - SetNode(node0_name, &node0); + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - SetNode(node1_name, &node1); + TestNode node1(1); + AddNode(&node1); - // Setup pipe between node0 and node1. PortRef x0, x1; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); + CreatePortPair(&node0, &x0, &node1, &x1); - // Transfer a port from node0 to node1. PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); - - EXPECT_EQ(OK, node0.ClosePort(a0)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); + EXPECT_EQ(OK, node0.node().ClosePort(a0)); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Basic2) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - SetNode(node0_name, &node0); + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - SetNode(node1_name, &node1); + TestNode node1(1); + AddNode(&node1); - // Setup pipe between node0 and node1. PortRef x0, x1; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); + CreatePortPair(&node0, &x0, &node1, &x1); PortRef b0, b1; - EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1)); - EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again")); + EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1)); + EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again")); - // This may cause a SendMessage(b1) failure. - EXPECT_EQ(OK, node0.ClosePort(b0)); + EXPECT_EQ(OK, node0.node().ClosePort(b0)); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Basic3) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - SetNode(node0_name, &node0); + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - SetNode(node1_name, &node1); + TestNode node1(1); + AddNode(&node1); - // Setup pipe between node0 and node1. PortRef x0, x1; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); + CreatePortPair(&node0, &x0, &node1, &x1); - // Transfer a port from node0 to node1. PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); - EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again")); + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); - // Transfer a0 as well. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1)); + EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again")); + + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0)); PortRef b0, b1; - EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1)); - EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz")); + EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1)); + EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz")); - // This may cause a SendMessage(b1) failure. - EXPECT_EQ(OK, node0.ClosePort(b0)); + EXPECT_EQ(OK, node0.node().ClosePort(b0)); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNode1) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - SetNode(node0_name, &node0); + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - SetNode(node1_name, &node1); + TestNode node1(1); + AddNode(&node1); + node1.set_drop_messages(true); - // Setup pipe between node0 and node1. PortRef x0, x1; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); - - // Transfer port to node1 and simulate a lost connection to node1. Dropping - // events from node1 is how we simulate the lost connection. + CreatePortPair(&node0, &x0, &node1, &x1); - node1_delegate.set_drop_messages(true); + // Transfer a port to node1 and simulate a lost connection to node1. PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1)); - PumpTasks(); + WaitForIdle(); - EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); + RemoveNode(&node1); - PumpTasks(); + WaitForIdle(); - EXPECT_EQ(OK, node0.ClosePort(a0)); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNode2) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); - // Setup pipe between node0 and node1. PortRef x0, x1; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); - - node1_delegate.set_read_messages(false); + CreatePortPair(&node0, &x0, &node1, &x1); PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1)); - PumpTasks(); + WaitForIdle(); - node1_delegate.set_drop_messages(true); + node1.set_drop_messages(true); - EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); + RemoveNode(&node1); - PumpTasks(); + WaitForIdle(); + // a0 should have eventually detected peer closure after node loss. ScopedMessage message; - EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); + EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message)); EXPECT_FALSE(message); - EXPECT_EQ(OK, node0.ClosePort(a0)); + EXPECT_EQ(OK, node0.node().ClosePort(a0)); - EXPECT_EQ(OK, node0.ClosePort(x0)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); - EXPECT_EQ(OK, node1.GetMessage(x1, &message)); + EXPECT_EQ(OK, node1.node().GetMessage(x1, &message)); EXPECT_TRUE(message); - ClosePortsInMessage(&node1, message.get()); + node1.ClosePortsInMessage(message.get()); - EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { // Tests that a proxy gets cleaned up when its indirect peer lives on a lost // node. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); - NodeName node2_name(2, 1); - TestNodeDelegate node2_delegate(node2_name); - Node node2(node2_name, &node2_delegate); - node_map[2] = &node2; - - node1_delegate.set_save_messages(true); + TestNode node2(2); + AddNode(&node2); // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); - EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); - EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&C)); - EXPECT_EQ(OK, node2.CreateUninitializedPort(&D)); - EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name())); - EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name())); + CreatePortPair(&node0, &A, &node1, &B); + CreatePortPair(&node1, &C, &node2, &D); // Create E-F and send F over A to node 1. PortRef E, F; - EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F)); - PumpTasks(); + WaitForIdle(); ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); - EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F)); + EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F)); // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 // will trivially become aware of the loss, and this test verifies that the // port A on node 0 will eventually also become aware of it. - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F)); + // Make sure node2 stops processing events when it encounters an ObserveProxy. + node2.BlockOnEvent(EventType::kObserveProxy); + + EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F)); + WaitForIdle(); - node_map[2] = nullptr; - EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name)); + // Simulate node 1 and 2 disconnecting. + EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name())); - PumpTasks(); + // Let node2 continue processing events and wait for everyone to go idle. + node2.Unblock(); + WaitForIdle(); // Port F should be gone. - EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F)); + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F)); // Port E should have detected peer closure despite the fact that there is // no longer a continuous route from F to E over which the event could travel. PortStatus status; - EXPECT_EQ(OK, node0.GetStatus(E, &status)); + EXPECT_EQ(OK, node0.node().GetStatus(E, &status)); EXPECT_TRUE(status.peer_closed); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(B)); - EXPECT_EQ(OK, node1.ClosePort(C)); - EXPECT_EQ(OK, node0.ClosePort(E)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(B)); + EXPECT_EQ(OK, node1.node().ClosePort(C)); + EXPECT_EQ(OK, node0.node().ClosePort(E)); + + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { // Tests that a proxy gets cleaned up when its direct peer lives on a lost // node and it's predecessor lives on the same node. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); - node1_delegate.set_save_messages(true); - - // Create A-B spanning nodes 0 and 1. PortRef A, B; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); - EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); - EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); + CreatePortPair(&node0, &A, &node1, &B); - // Create C-D and send D over A to node 1. PortRef C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); + + // Send D but block node0 on an ObserveProxy event. + node0.BlockOnEvent(EventType::kObserveProxy); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D)); - // Pump tasks until the start of port collapse for port D, which should become - // a proxy. - PumpUntilTask(EventType::kObserveProxy); + // node0 won't collapse the proxy but node1 will receive the message before + // going idle. + WaitForIdle(); ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + ASSERT_TRUE(node1.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); - PortRef E; - EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E)); + EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); - EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); - PumpTasks(); + RemoveNode(&node1); + + node0.Unblock(); + WaitForIdle(); // Port C should have detected peer closure. PortStatus status; - EXPECT_EQ(OK, node0.GetStatus(C, &status)); + EXPECT_EQ(OK, node0.node().GetStatus(C, &status)); EXPECT_TRUE(status.peer_closed); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(B)); - EXPECT_EQ(OK, node0.ClosePort(C)); - EXPECT_EQ(OK, node1.ClosePort(E)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(B)); + EXPECT_EQ(OK, node0.node().ClosePort(C)); + EXPECT_EQ(OK, node1.node().ClosePort(E)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage1) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node(0); + AddNode(&node); PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); ScopedMessage message; - EXPECT_EQ(OK, node0.GetMessage(a0, &message)); + EXPECT_EQ(OK, node.node().GetMessage(a0, &message)); EXPECT_FALSE(message); - EXPECT_EQ(OK, node0.ClosePort(a1)); - - EXPECT_EQ(OK, node0.GetMessage(a0, &message)); - EXPECT_FALSE(message); + EXPECT_EQ(OK, node.node().ClosePort(a1)); - PumpTasks(); + WaitForIdle(); - EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); + EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message)); EXPECT_FALSE(message); - EXPECT_EQ(OK, node0.ClosePort(a0)); + EXPECT_EQ(OK, node.node().ClosePort(a0)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + WaitForIdle(); + + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage2) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_read_messages(false); + TestNode node(0); + AddNode(&node); PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); - EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1")); + EXPECT_EQ(OK, node.SendStringMessage(a1, "1")); ScopedMessage message; - EXPECT_EQ(OK, node0.GetMessage(a0, &message)); + EXPECT_EQ(OK, node.node().GetMessage(a0, &message)); ASSERT_TRUE(message); - EXPECT_EQ(0, strcmp("1", ToString(message))); + EXPECT_TRUE(MessageEquals(message, "1")); - EXPECT_EQ(OK, node0.ClosePort(a0)); - EXPECT_EQ(OK, node0.ClosePort(a1)); + EXPECT_EQ(OK, node.node().ClosePort(a0)); + EXPECT_EQ(OK, node.node().ClosePort(a1)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, GetMessage3) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_read_messages(false); + TestNode node(0); + AddNode(&node); PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1)); const char* kStrings[] = { "1", @@ -696,371 +777,305 @@ TEST_F(PortsTest, GetMessage3) { }; for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) - EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i])); + EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i])); ScopedMessage message; for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) { - EXPECT_EQ(OK, node0.GetMessage(a0, &message)); + EXPECT_EQ(OK, node.node().GetMessage(a0, &message)); ASSERT_TRUE(message); - EXPECT_EQ(0, strcmp(kStrings[i], ToString(message))); - DVLOG(1) << "got " << kStrings[i]; + EXPECT_TRUE(MessageEquals(message, kStrings[i])); } - EXPECT_EQ(OK, node0.ClosePort(a0)); - EXPECT_EQ(OK, node0.ClosePort(a1)); + EXPECT_EQ(OK, node.node().ClosePort(a0)); + EXPECT_EQ(OK, node.node().ClosePort(a1)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, Delegation1) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - SetNode(node0_name, &node0); + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); - node0_delegate.set_save_messages(true); - node1_delegate.set_save_messages(true); - - // Setup pipe between node0 and node1. PortRef x0, x1; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); + CreatePortPair(&node0, &x0, &node1, &x1); // In this test, we send a message to a port that has been moved. PortRef a0, a1; - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); - - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1)); - - PumpTasks(); + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1)); + WaitForIdle(); ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - + ASSERT_TRUE(node1.ReadMessage(x1, &message)); ASSERT_EQ(1u, message->num_ports()); + EXPECT_TRUE(MessageEquals(message, "a1")); // This is "a1" from the point of view of node1. PortName a2_name = message->ports()[0]; + EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name)); + EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello")); - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name)); - - PumpTasks(); - - EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello")); - - PumpTasks(); - - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + WaitForIdle(); + ASSERT_TRUE(node0.ReadMessage(x0, &message)); ASSERT_EQ(1u, message->num_ports()); + EXPECT_TRUE(MessageEquals(message, "a2")); // This is "a2" from the point of view of node1. PortName a3_name = message->ports()[0]; PortRef a3; - EXPECT_EQ(OK, node0.GetPort(a3_name, &a3)); - - EXPECT_EQ(0, strcmp("a2", ToString(message))); - - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3)); + ASSERT_TRUE(node0.ReadMessage(a3, &message)); EXPECT_EQ(0u, message->num_ports()); - EXPECT_EQ(0, strcmp("hello", ToString(message))); + EXPECT_TRUE(MessageEquals(message, "hello")); - EXPECT_EQ(OK, node0.ClosePort(a0)); - EXPECT_EQ(OK, node0.ClosePort(a3)); + EXPECT_EQ(OK, node0.node().ClosePort(a0)); + EXPECT_EQ(OK, node0.node().ClosePort(a3)); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_EQ(OK, node1.ClosePort(x1)); + EXPECT_EQ(OK, node0.node().ClosePort(x0)); + EXPECT_EQ(OK, node1.node().ClosePort(x1)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, Delegation2) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - SetNode(node0_name, &node0); - - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node0(0); + AddNode(&node0); - node0_delegate.set_save_messages(true); - node1_delegate.set_save_messages(true); + TestNode node1(1); + AddNode(&node1); - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 100; ++i) { // Setup pipe a<->b between node0 and node1. PortRef A, B; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); - EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); - EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); - EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); + CreatePortPair(&node0, &A, &node1, &B); PortRef C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D)); PortRef E, F; - EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F)); + + node1.set_save_messages(true); // Pass D over A to B. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D)); // Pass F over C to D. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F)); // This message should find its way to node1. - EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello")); + EXPECT_EQ(OK, node0.SendStringMessage(E, "hello")); - PumpTasks(); + WaitForIdle(); - EXPECT_EQ(OK, node0.ClosePort(C)); - EXPECT_EQ(OK, node0.ClosePort(E)); + EXPECT_EQ(OK, node0.node().ClosePort(C)); + EXPECT_EQ(OK, node0.node().ClosePort(E)); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(B)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(B)); - for (;;) { - ScopedMessage message; - if (node1_delegate.GetSavedMessage(&message)) { - ClosePortsInMessage(&node1, message.get()); - if (strcmp("hello", ToString(message)) == 0) - break; - } else { - ASSERT_TRUE(false); // "hello" message not delivered! + bool got_hello = false; + ScopedMessage message; + while (node1.GetSavedMessage(&message)) { + node1.ClosePortsInMessage(message.get()); + if (MessageEquals(message, "hello")) { + got_hello = true; break; } } - PumpTasks(); // Because ClosePort may have generated tasks. + EXPECT_TRUE(got_hello); + + WaitForIdle(); // Because closing ports may have generated tasks. } - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendUninitialized) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node(0); + AddNode(&node); PortRef x0; - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); - EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, - SendStringMessage(&node0, x0, "oops")); - EXPECT_EQ(OK, node0.ClosePort(x0)); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0)); + EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops")); + EXPECT_EQ(OK, node.node().ClosePort(x0)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendFailure) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node(0); + AddNode(&node); - node0_delegate.set_save_messages(true); + node.set_save_messages(true); PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); // Try to send A over itself. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, - SendStringMessageWithPort(&node0, A, "oops", A)); + node.SendStringMessageWithPort(A, "oops", A)); // Try to send B over A. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, - SendStringMessageWithPort(&node0, A, "nope", B)); + node.SendStringMessageWithPort(A, "nope", B)); // B should be closed immediately. - EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); + EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B)); - PumpTasks(); + WaitForIdle(); // There should have been no messages accepted. ScopedMessage message; - EXPECT_FALSE(node0_delegate.GetSavedMessage(&message)); + EXPECT_FALSE(node.GetSavedMessage(&message)); - EXPECT_EQ(OK, node0.ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(A)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, DontLeakUnreceivedPorts) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_read_messages(false); + TestNode node(0); + AddNode(&node); - PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - - PortRef C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); - - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); - - PumpTasks(); + PortRef A, B, C, D; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); - EXPECT_EQ(OK, node0.ClosePort(C)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node0.ClosePort(B)); + EXPECT_EQ(OK, node.node().ClosePort(C)); + EXPECT_EQ(OK, node.node().ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(B)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node(0); + AddNode(&node); - node0_delegate.set_save_messages(true); - - PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - - PortRef C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); + PortRef A, B, C, D; + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D)); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D)); ScopedMessage message; - EXPECT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_TRUE(node.ReadMessage(B, &message)); ASSERT_EQ(1u, message->num_ports()); - + EXPECT_TRUE(MessageEquals(message, "foo")); PortRef E; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); - EXPECT_TRUE(node0.CanShutdownCleanly(true)); + EXPECT_TRUE( + node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(true)); - EXPECT_FALSE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE( + node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + EXPECT_FALSE(node.node().CanShutdownCleanly()); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node0.ClosePort(B)); - EXPECT_EQ(OK, node0.ClosePort(C)); - EXPECT_EQ(OK, node0.ClosePort(E)); + EXPECT_EQ(OK, node.node().ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(B)); + EXPECT_EQ(OK, node.node().ClosePort(C)); + EXPECT_EQ(OK, node.node().ClosePort(E)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, ProxyCollapse1) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_save_messages(true); + TestNode node(0); + AddNode(&node); PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); PortRef X, Y; - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and receive it as C. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); // Send C and receive it as D. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); // Send D and receive it as E. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); - EXPECT_EQ(OK, node0.ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(Y)); + EXPECT_EQ(OK, node.node().ClosePort(X)); + EXPECT_EQ(OK, node.node().ClosePort(Y)); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node0.ClosePort(E)); + EXPECT_EQ(OK, node.node().ClosePort(A)); + EXPECT_EQ(OK, node.node().ClosePort(E)); - PumpTasks(); + // The node should not idle until all proxies are collapsed. + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, ProxyCollapse2) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_save_messages(true); + TestNode node(0); + AddNode(&node); PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); PortRef X, Y; - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); ScopedMessage message; - // Send B and receive it as C. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - ASSERT_EQ(1u, message->num_ports()); - PortRef C; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); + // Send B and A to create proxies in each direction. + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); - // Send A and receive it as D. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - ASSERT_EQ(1u, message->num_ports()); - PortRef D; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); + EXPECT_EQ(OK, node.node().ClosePort(X)); + EXPECT_EQ(OK, node.node().ClosePort(Y)); // At this point we have a scenario with: // // D -> [B] -> C -> [A] // - // Ensure that the proxies can collapse. - - EXPECT_EQ(OK, node0.ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(Y)); + // Ensure that the proxies can collapse. The sent ports will be closed + // eventually as a result of Y's closure. - EXPECT_EQ(OK, node0.ClosePort(C)); - EXPECT_EQ(OK, node0.ClosePort(D)); + WaitForIdle(); - PumpTasks(); - - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendWithClosedPeer) { @@ -1068,56 +1083,47 @@ TEST_F(PortsTest, SendWithClosedPeer) { // closed, the newly created port will be aware of that peer closure, and the // proxy will eventually collapse. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_read_messages(false); + TestNode node(0); + AddNode(&node); // Send a message from A to B, then close A. PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); - EXPECT_EQ(OK, node0.ClosePort(A)); - - PumpTasks(); + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.SendStringMessage(A, "hey")); + EXPECT_EQ(OK, node.node().ClosePort(A)); // Now send B over X-Y as new port C. PortRef X, Y; - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); - - node0_delegate.set_read_messages(true); - node0_delegate.set_save_messages(true); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); - - EXPECT_EQ(OK, node0.ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(Y)); - + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); ScopedMessage message; - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); - PortRef C; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); + + EXPECT_EQ(OK, node.node().ClosePort(X)); + EXPECT_EQ(OK, node.node().ClosePort(Y)); - PumpTasks(); + WaitForIdle(); - // C should receive the message originally sent to B, and it should also be - // aware of A's closure. + // C should have received the message originally sent to B, and it should also + // be aware of A's closure. - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); + ASSERT_TRUE(node.ReadMessage(C, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); PortStatus status; - EXPECT_EQ(OK, node0.GetStatus(C, &status)); + EXPECT_EQ(OK, node.node().GetStatus(C, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); - node0.ClosePort(C); + node.node().ClosePort(C); + + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, SendWithClosedPeerSent) { @@ -1126,391 +1132,342 @@ TEST_F(PortsTest, SendWithClosedPeerSent) { // eventually notified of the closure, and the dead-end proxies will // eventually be removed. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; - - node0_delegate.set_save_messages(true); + TestNode node(0); + AddNode(&node); PortRef X, Y; - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y)); PortRef A, B; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B)); ScopedMessage message; // Send A as new port C. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A)); + + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C)); // Send C as new port D. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C)); + + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); - - node0_delegate.set_read_messages(false); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D)); // Send a message to B through D, then close D. - EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); - EXPECT_EQ(OK, node0.ClosePort(D)); - - PumpTasks(); + EXPECT_EQ(OK, node.SendStringMessage(D, "hey")); + EXPECT_EQ(OK, node.node().ClosePort(D)); // Now send B as new port E. - node0_delegate.set_read_messages(true); - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B)); + EXPECT_EQ(OK, node.node().ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(Y)); - - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + ASSERT_TRUE(node.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); - PortRef E; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E)); + + EXPECT_EQ(OK, node.node().ClosePort(Y)); - PumpTasks(); + WaitForIdle(); // E should receive the message originally sent to B, and it should also be // aware of D's closure. - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); + ASSERT_TRUE(node.ReadMessage(E, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); PortStatus status; - EXPECT_EQ(OK, node0.GetStatus(E, &status)); + EXPECT_EQ(OK, node.node().GetStatus(E, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); - node0.ClosePort(E); + EXPECT_EQ(OK, node.node().ClosePort(E)); - PumpTasks(); + WaitForIdle(); - EXPECT_TRUE(node0.CanShutdownCleanly(false)); + EXPECT_TRUE(node.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePorts) { - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_read_messages(false); - node1_delegate.set_save_messages(true); + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on A. - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); - - PumpTasks(); + EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); - PumpTasks(); + WaitForIdle(); - // Expect only two receiving ports to be left after pumping tasks. - EXPECT_TRUE(node0.CanShutdownCleanly(true)); - EXPECT_TRUE(node1.CanShutdownCleanly(true)); + // Expect all proxies to be gone once idle. + EXPECT_TRUE( + node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + EXPECT_TRUE( + node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); // Expect D to have received the message sent on A. ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); + ASSERT_TRUE(node1.ReadMessage(D, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(D)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); // No more ports should be open. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortWithClosedPeer1) { // This tests that the right thing happens when initiating a merge on a port // whose peer has already been closed. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_read_messages(false); - node1_delegate.set_save_messages(true); + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Write a message on A. - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); - - PumpTasks(); + EXPECT_EQ(OK, node0.SendStringMessage(A, "hey")); // Close A. - EXPECT_EQ(OK, node0.ClosePort(A)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); - PumpTasks(); + WaitForIdle(); - // Expect only one receiving port to be left after pumping tasks. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(true)); + // Expect all proxies to be gone once idle. node0 should have no ports since + // A was explicitly closed. + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE( + node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); // Expect D to have received the message sent on A. ScopedMessage message; - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); + ASSERT_TRUE(node1.ReadMessage(D, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); - EXPECT_EQ(OK, node1.ClosePort(D)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); // No more ports should be open. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortWithClosedPeer2) { // This tests that the right thing happens when merging into a port whose peer // has already been closed. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_save_messages(true); - node1_delegate.set_read_messages(false); - - // Write a message on D. - EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); - PumpTasks(); - - // Close D. - EXPECT_EQ(OK, node1.ClosePort(D)); + // Write a message on D and close it. + EXPECT_EQ(OK, node0.SendStringMessage(D, "hey")); + EXPECT_EQ(OK, node1.node().ClosePort(D)); // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); - PumpTasks(); + WaitForIdle(); - // Expect only one receiving port to be left after pumping tasks. - EXPECT_TRUE(node0.CanShutdownCleanly(true)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + // Expect all proxies to be gone once idle. node1 should have no ports since + // D was explicitly closed. + EXPECT_TRUE( + node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); // Expect A to have received the message sent on D. ScopedMessage message; - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); + ASSERT_TRUE(node0.ReadMessage(A, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); - EXPECT_EQ(OK, node0.ClosePort(A)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); // No more ports should be open. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsWithClosedPeers) { // This tests that no residual ports are left behind if two ports are merged // when both of their peers have been closed. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - node0_delegate.set_save_messages(true); - node1_delegate.set_read_messages(false); + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Close A and D. - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(D)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); - PumpTasks(); + WaitForIdle(); // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); - PumpTasks(); + WaitForIdle(); // Expect everything to have gone away. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsWithMovedPeers) { - // This tests that no ports can be merged successfully even if their peers - // are moved around. + // This tests that ports can be merged successfully even if their peers are + // moved around. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; - - node0_delegate.set_save_messages(true); - node1_delegate.set_read_messages(false); + TestNode node1(1); + AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); // Set up another pair X-Y for moving ports on node0. PortRef X, Y; - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); + EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y)); ScopedMessage message; // Move A to new port E. - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A)); + ASSERT_TRUE(node0.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); - - EXPECT_EQ(OK, node0.ClosePort(X)); - EXPECT_EQ(OK, node0.ClosePort(Y)); + ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E)); - node0_delegate.set_read_messages(false); + EXPECT_EQ(OK, node0.node().ClosePort(X)); + EXPECT_EQ(OK, node0.node().ClosePort(Y)); // Write messages on E and D. - EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey")); - EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi")); + EXPECT_EQ(OK, node0.SendStringMessage(E, "hey")); + EXPECT_EQ(OK, node1.SendStringMessage(D, "hi")); // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); - - node0_delegate.set_read_messages(true); - node1_delegate.set_read_messages(true); - node1_delegate.set_save_messages(true); + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); - PumpTasks(); + WaitForIdle(); // Expect to receive D's message on E and E's message on D. - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hi", ToString(message))); - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); + ASSERT_TRUE(node0.ReadMessage(E, &message)); + EXPECT_TRUE(MessageEquals(message, "hi")); + ASSERT_TRUE(node1.ReadMessage(D, &message)); + EXPECT_TRUE(MessageEquals(message, "hey")); // Close E and D. - EXPECT_EQ(OK, node0.ClosePort(E)); - EXPECT_EQ(OK, node1.ClosePort(D)); + EXPECT_EQ(OK, node0.node().ClosePort(E)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); - PumpTasks(); + WaitForIdle(); // Expect everything to have gone away. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } TEST_F(PortsTest, MergePortsFailsGracefully) { // This tests that the system remains in a well-defined state if something // goes wrong during port merge. - NodeName node0_name(0, 1); - TestNodeDelegate node0_delegate(node0_name); - Node node0(node0_name, &node0_delegate); - node_map[0] = &node0; + TestNode node0(0); + AddNode(&node0); - NodeName node1_name(1, 1); - TestNodeDelegate node1_delegate(node1_name); - Node node1(node1_name, &node1_delegate); - node_map[1] = &node1; + TestNode node1(1); + AddNode(&node1); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); - - PumpTasks(); - - // Initiate a merge between B and C. - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D)); - // Move C to a new port E. This is dumb and nobody should do it, but it's - // possible. MergePorts will fail as a result because C won't be in a - // receiving state when the event arrives at node1, so B should be closed. ScopedMessage message; PortRef X, Y; - EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y)); - node1_delegate.set_save_messages(true); - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C)); - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); + EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y)); + + // Block the merge from proceeding until we can do something stupid with port + // C. This avoids the test logic racing with async merge logic. + node1.BlockOnEvent(EventType::kMergePort); + + // Initiate the merge between B and C. + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name())); + + // Move C to a new port E. This is not a sane use of Node's public API but + // is still hypothetically possible. It allows us to force a merge failure + // because C will be in an invalid state by the term the merge is processed. + // As a result, B should be closed. + EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C)); + + node1.Unblock(); + + ASSERT_TRUE(node1.ReadMessage(Y, &message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; - ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E)); - EXPECT_EQ(OK, node1.ClosePort(X)); - EXPECT_EQ(OK, node1.ClosePort(Y)); + ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E)); - // C goes away as a result of normal proxy removal. - PumpTasks(); + EXPECT_EQ(OK, node1.node().ClosePort(X)); + EXPECT_EQ(OK, node1.node().ClosePort(Y)); - EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C)); + WaitForIdle(); - // B should have been closed cleanly. - EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); + // C goes away as a result of normal proxy removal. B should have been closed + // cleanly by the failed MergePorts. + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C)); + EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B)); // Close A, D, and E. - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node1.ClosePort(D)); - EXPECT_EQ(OK, node1.ClosePort(E)); + EXPECT_EQ(OK, node0.node().ClosePort(A)); + EXPECT_EQ(OK, node1.node().ClosePort(D)); + EXPECT_EQ(OK, node1.node().ClosePort(E)); - PumpTasks(); + WaitForIdle(); // Expect everything to have gone away. - EXPECT_TRUE(node0.CanShutdownCleanly(false)); - EXPECT_TRUE(node1.CanShutdownCleanly(false)); + EXPECT_TRUE(node0.node().CanShutdownCleanly()); + EXPECT_TRUE(node1.node().CanShutdownCleanly()); } } // namespace test |