summaryrefslogtreecommitdiff
path: root/chromium/mojo
diff options
context:
space:
mode:
authorAllan Sandfeld Jensen <allan.jensen@theqtcompany.com>2016-08-25 10:44:03 +0200
committerAllan Sandfeld Jensen <allan.jensen@qt.io>2016-08-25 09:40:07 +0000
commite20ba3c57b50674f625b5088faa0fe9a076c0617 (patch)
tree3006142b83866a52a56d34ade8446d5044647305 /chromium/mojo
parent28b1110370900897ab652cb420c371fab8857ad4 (diff)
downloadqtwebengine-chromium-e20ba3c57b50674f625b5088faa0fe9a076c0617.tar.gz
BASELINE: Update Chromium to 53.0.2785.80
Also adds 3rdparty libraries under pdfium. Change-Id: I29afb23f1642fa55765d056697d5d145afa22bb2 Reviewed-by: Michael BrĂ¼ning <michael.bruning@qt.io>
Diffstat (limited to 'chromium/mojo')
-rw-r--r--chromium/mojo/edk/system/node_controller.cc3
-rw-r--r--chromium/mojo/edk/system/ports/node.cc17
-rw-r--r--chromium/mojo/edk/system/ports/node.h13
-rw-r--r--chromium/mojo/edk/system/ports/ports_unittest.cc1667
4 files changed, 828 insertions, 872 deletions
diff --git a/chromium/mojo/edk/system/node_controller.cc b/chromium/mojo/edk/system/node_controller.cc
index 5fc55f557d7..f40a0cf37d6 100644
--- a/chromium/mojo/edk/system/node_controller.cc
+++ b/chromium/mojo/edk/system/node_controller.cc
@@ -1193,7 +1193,8 @@ void NodeController::AttemptShutdownIfRequested() {
base::AutoLock lock(shutdown_lock_);
if (shutdown_callback_.is_null())
return;
- if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
+ if (!node_->CanShutdownCleanly(
+ ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
DVLOG(2) << "Unable to cleanly shut down node " << name_;
return;
}
diff --git a/chromium/mojo/edk/system/ports/node.cc b/chromium/mojo/edk/system/ports/node.cc
index 128ecdf4ca7..c7f42f6f8b3 100644
--- a/chromium/mojo/edk/system/ports/node.cc
+++ b/chromium/mojo/edk/system/ports/node.cc
@@ -64,10 +64,10 @@ Node::~Node() {
DLOG(WARNING) << "Unclean shutdown for node " << name_;
}
-bool Node::CanShutdownCleanly(bool allow_local_ports) {
+bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
base::AutoLock ports_lock(ports_lock_);
- if (!allow_local_ports) {
+ if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
#if DCHECK_IS_ON()
for (auto entry : ports_) {
DVLOG(2) << "Port " << entry.first << " referencing node "
@@ -78,6 +78,8 @@ bool Node::CanShutdownCleanly(bool allow_local_ports) {
return ports_.empty();
}
+ DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
+
// NOTE: This is not efficient, though it probably doesn't need to be since
// relatively few ports should be open during shutdown and shutdown doesn't
// need to be blazingly fast.
@@ -531,17 +533,6 @@ int Node::OnObserveProxy(const PortName& port_name,
scoped_refptr<Port> port = GetPort(port_name);
if (!port) {
DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
-
- if (port_name != event.proxy_port_name &&
- port_name != event.proxy_to_port_name) {
- // The receiving port may have been removed while this message was in
- // transit. In this case, we restart the ObserveProxy circulation from
- // the referenced proxy port to avoid leaking the proxy.
- delegate_->ForwardMessage(
- event.proxy_node_name,
- NewInternalMessage(
- event.proxy_port_name, EventType::kObserveProxy, event));
- }
return OK;
}
diff --git a/chromium/mojo/edk/system/ports/node.h b/chromium/mojo/edk/system/ports/node.h
index 3aeadcaeb40..65252a3af71 100644
--- a/chromium/mojo/edk/system/ports/node.h
+++ b/chromium/mojo/edk/system/ports/node.h
@@ -48,6 +48,11 @@ class NodeDelegate;
class Node {
public:
+ enum class ShutdownPolicy {
+ DONT_ALLOW_LOCAL_PORTS,
+ ALLOW_LOCAL_PORTS,
+ };
+
// Does not take ownership of the delegate.
Node(const NodeName& name, NodeDelegate* delegate);
~Node();
@@ -59,9 +64,11 @@ class Node {
// method may be called again after AcceptMessage to check if the Node is now
// ready to be destroyed.
//
- // If |allow_local_ports| is |true|, this will only return |false| when there
- // are transient ports referring to other nodes.
- bool CanShutdownCleanly(bool allow_local_ports);
+ // If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return
+ // |true| even if some ports remain alive, as long as none of them are proxies
+ // to another node.
+ bool CanShutdownCleanly(
+ ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS);
// Lookup the named port.
int GetPort(const PortName& port_name, PortRef* port_ref);
diff --git a/chromium/mojo/edk/system/ports/ports_unittest.cc b/chromium/mojo/edk/system/ports/ports_unittest.cc
index 200e72bb1b6..b76fa10de5f 100644
--- a/chromium/mojo/edk/system/ports/ports_unittest.cc
+++ b/chromium/mojo/edk/system/ports/ports_unittest.cc
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -9,9 +10,18 @@
#include <map>
#include <queue>
#include <sstream>
+#include <utility>
+#include "base/bind.h"
+#include "base/callback.h"
#include "base/logging.h"
+#include "base/memory/ref_counted.h"
#include "base/rand_util.h"
+#include "base/strings/string_piece.h"
+#include "base/strings/stringprintf.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/thread.h"
#include "mojo/edk/system/ports/event.h"
#include "mojo/edk/system/ports/node.h"
#include "mojo/edk/system/ports/node_delegate.h"
@@ -24,24 +34,8 @@ namespace test {
namespace {
-void LogMessage(const Message* message) {
- std::stringstream ports;
- for (size_t i = 0; i < message->num_ports(); ++i) {
- if (i > 0)
- ports << ",";
- ports << message->ports()[i];
- }
- DVLOG(1) << "message: \""
- << static_cast<const char*>(message->payload_bytes())
- << "\" ports=[" << ports.str() << "]";
-}
-
-void ClosePortsInMessage(Node* node, Message* message) {
- for (size_t i = 0; i < message->num_ports(); ++i) {
- PortRef port;
- ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
- EXPECT_EQ(OK, node->ClosePort(port));
- }
+bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
+ return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data());
}
class TestMessage : public Message {
@@ -71,132 +65,140 @@ class TestMessage : public Message {
}
};
-struct Task {
- Task(NodeName node_name, ScopedMessage message)
- : node_name(node_name),
- message(std::move(message)),
- priority(base::RandUint64()) {
- }
+class TestNode;
- NodeName node_name;
- ScopedMessage message;
- uint64_t priority;
+class MessageRouter {
+ public:
+ virtual ~MessageRouter() {}
+
+ virtual void GeneratePortName(PortName* name) = 0;
+ virtual void ForwardMessage(TestNode* from_node,
+ const NodeName& node_name,
+ ScopedMessage message) = 0;
+ virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0;
};
-struct TaskComparator {
- bool operator()(const Task* a, const Task* b) {
- return a->priority < b->priority;
+class TestNode : public NodeDelegate {
+ public:
+ explicit TestNode(uint64_t id)
+ : node_name_(id, 1),
+ node_(node_name_, this),
+ node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
+ messages_available_event_(
+ base::WaitableEvent::ResetPolicy::AUTOMATIC,
+ base::WaitableEvent::InitialState::NOT_SIGNALED),
+ idle_event_(
+ base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::SIGNALED) {
}
-};
-const size_t kMaxNodes = 3;
+ ~TestNode() override {
+ StopWhenIdle();
+ node_thread_.Stop();
+ }
-std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
-Node* node_map[kMaxNodes];
+ const NodeName& name() const { return node_name_; }
-Node* GetNode(const NodeName& name) {
- return node_map[name.v1];
-}
+ // NOTE: Node is thread-safe.
+ Node& node() { return node_; }
-void SetNode(const NodeName& name, Node* node) {
- node_map[name.v1] = node;
-}
-
-void PumpTasks() {
- while (!task_queue.empty()) {
- Task* task = task_queue.top();
- task_queue.pop();
+ base::WaitableEvent& idle_event() { return idle_event_; }
- Node* node = GetNode(task->node_name);
- if (node)
- node->AcceptMessage(std::move(task->message));
+ bool IsIdle() {
+ base::AutoLock lock(lock_);
+ return started_ && !dispatching_ &&
+ (incoming_messages_.empty() || (block_on_event_ && blocked_));
+ }
- delete task;
+ void BlockOnEvent(EventType type) {
+ base::AutoLock lock(lock_);
+ blocked_event_type_ = type;
+ block_on_event_ = true;
}
-}
-void PumpUntilTask(EventType type) {
- while (!task_queue.empty()) {
- Task* task = task_queue.top();
+ void Unblock() {
+ base::AutoLock lock(lock_);
+ block_on_event_ = false;
+ messages_available_event_.Signal();
+ }
- const EventHeader* header = GetEventHeader(*task->message);
- if (header->type == type)
- return;
+ void Start(MessageRouter* router) {
+ router_ = router;
+ node_thread_.Start();
+ node_thread_.task_runner()->PostTask(
+ FROM_HERE,
+ base::Bind(&TestNode::ProcessMessages, base::Unretained(this)));
+ }
- task_queue.pop();
+ void StopWhenIdle() {
+ base::AutoLock lock(lock_);
+ should_quit_ = true;
+ messages_available_event_.Signal();
+ }
- Node* node = GetNode(task->node_name);
- if (node)
- node->AcceptMessage(std::move(task->message));
+ void WakeUp() { messages_available_event_.Signal(); }
- delete task;
+ int SendStringMessage(const PortRef& port, const std::string& s) {
+ size_t size = s.size() + 1;
+ ScopedMessage message = TestMessage::NewUserMessage(size, 0);
+ memcpy(message->mutable_payload_bytes(), s.data(), size);
+ return node_.SendMessage(port, std::move(message));
}
-}
-void DiscardPendingTasks() {
- while (!task_queue.empty()) {
- Task* task = task_queue.top();
- task_queue.pop();
- delete task;
+ int SendStringMessageWithPort(const PortRef& port,
+ const std::string& s,
+ const PortName& sent_port_name) {
+ size_t size = s.size() + 1;
+ ScopedMessage message = TestMessage::NewUserMessage(size, 1);
+ memcpy(message->mutable_payload_bytes(), s.data(), size);
+ message->mutable_ports()[0] = sent_port_name;
+ return node_.SendMessage(port, std::move(message));
}
-}
-
-int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
- size_t size = s.size() + 1;
- ScopedMessage message = TestMessage::NewUserMessage(size, 0);
- memcpy(message->mutable_payload_bytes(), s.data(), size);
- return node->SendMessage(port, std::move(message));
-}
-
-int SendStringMessageWithPort(Node* node,
- const PortRef& port,
- const std::string& s,
- const PortName& sent_port_name) {
- size_t size = s.size() + 1;
- ScopedMessage message = TestMessage::NewUserMessage(size, 1);
- memcpy(message->mutable_payload_bytes(), s.data(), size);
- message->mutable_ports()[0] = sent_port_name;
- return node->SendMessage(port, std::move(message));
-}
-int SendStringMessageWithPort(Node* node,
- const PortRef& port,
- const std::string& s,
- const PortRef& sent_port) {
- return SendStringMessageWithPort(node, port, s, sent_port.name());
-}
+ int SendStringMessageWithPort(const PortRef& port,
+ const std::string& s,
+ const PortRef& sent_port) {
+ return SendStringMessageWithPort(port, s, sent_port.name());
+ }
-const char* ToString(const ScopedMessage& message) {
- return static_cast<const char*>(message->payload_bytes());
-}
+ void set_drop_messages(bool value) {
+ base::AutoLock lock(lock_);
+ drop_messages_ = value;
+ }
-class TestNodeDelegate : public NodeDelegate {
- public:
- explicit TestNodeDelegate(const NodeName& node_name)
- : node_name_(node_name),
- drop_messages_(false),
- read_messages_(true),
- save_messages_(false) {
+ void set_save_messages(bool value) {
+ base::AutoLock lock(lock_);
+ save_messages_ = value;
}
- void set_drop_messages(bool value) { drop_messages_ = value; }
- void set_read_messages(bool value) { read_messages_ = value; }
- void set_save_messages(bool value) { save_messages_ = value; }
+ bool ReadMessage(const PortRef& port, ScopedMessage* message) {
+ return node_.GetMessage(port, message) == OK && *message;
+ }
bool GetSavedMessage(ScopedMessage* message) {
+ base::AutoLock lock(lock_);
if (saved_messages_.empty()) {
message->reset();
return false;
}
- *message = std::move(saved_messages_.front());
+ std::swap(*message, saved_messages_.front());
saved_messages_.pop();
return true;
}
+ void EnqueueMessage(ScopedMessage message) {
+ idle_event_.Reset();
+
+ // NOTE: This may be called from ForwardMessage and thus must not reenter
+ // |node_|.
+ base::AutoLock lock(lock_);
+ incoming_messages_.emplace(std::move(message));
+ messages_available_event_.Signal();
+ }
+
void GenerateRandomPortName(PortName* port_name) override {
- static uint64_t next_port_name = 1;
- port_name->v1 = next_port_name++;
- port_name->v2 = 0;
+ DCHECK(router_);
+ router_->GeneratePortName(port_name);
}
void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
@@ -205,489 +207,568 @@ class TestNodeDelegate : public NodeDelegate {
void ForwardMessage(const NodeName& node_name,
ScopedMessage message) override {
- if (drop_messages_) {
- DVLOG(1) << "Dropping ForwardMessage from node "
- << node_name_ << " to " << node_name;
- ClosePortsInMessage(GetNode(node_name), message.get());
- return;
+ {
+ base::AutoLock lock(lock_);
+ if (drop_messages_) {
+ DVLOG(1) << "Dropping ForwardMessage from node "
+ << node_name_ << " to " << node_name;
+
+ base::AutoUnlock unlock(lock_);
+ ClosePortsInMessage(message.get());
+ return;
+ }
}
+
+ DCHECK(router_);
DVLOG(1) << "ForwardMessage from node "
<< node_name_ << " to " << node_name;
- task_queue.push(new Task(node_name, std::move(message)));
+ router_->ForwardMessage(this, node_name, std::move(message));
}
void BroadcastMessage(ScopedMessage message) override {
- for (size_t i = 0; i < kMaxNodes; ++i) {
- Node* node = node_map[i];
- // Broadcast doesn't deliver to the local node.
- if (node && node != GetNode(node_name_)) {
- // NOTE: We only need to support broadcast of events, which have no
- // payload or ports bytes.
- ScopedMessage new_message(
- new TestMessage(message->num_header_bytes(), 0, 0));
- memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
- message->num_header_bytes());
- node->AcceptMessage(std::move(new_message));
- }
- }
+ router_->BroadcastMessage(this, std::move(message));
}
void PortStatusChanged(const PortRef& port) override {
- DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
- if (!read_messages_)
+ // The port may be closed, in which case we ignore the notification.
+ base::AutoLock lock(lock_);
+ if (!save_messages_)
return;
- Node* node = GetNode(node_name_);
+
for (;;) {
ScopedMessage message;
- int rv = node->GetMessage(port, &message);
- EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
- if (rv == ERROR_PORT_PEER_CLOSED || !message)
- break;
- if (save_messages_) {
- SaveMessage(std::move(message));
- } else {
- LogMessage(message.get());
- for (size_t i = 0; i < message->num_ports(); ++i) {
- std::stringstream buf;
- buf << "got port: " << message->ports()[i];
-
- PortRef received_port;
- node->GetPort(message->ports()[i], &received_port);
+ {
+ base::AutoUnlock unlock(lock_);
+ if (!ReadMessage(port, &message))
+ break;
+ }
- SendStringMessage(node, received_port, buf.str());
+ saved_messages_.emplace(std::move(message));
+ }
+ }
- // Avoid leaking these ports.
- node->ClosePort(received_port);
- }
- }
+ void ClosePortsInMessage(Message* message) {
+ for (size_t i = 0; i < message->num_ports(); ++i) {
+ PortRef port;
+ ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port));
+ EXPECT_EQ(OK, node_.ClosePort(port));
}
}
private:
- void SaveMessage(ScopedMessage message) {
- saved_messages_.emplace(std::move(message));
+ void ProcessMessages() {
+ for (;;) {
+ messages_available_event_.Wait();
+
+ base::AutoLock lock(lock_);
+
+ if (should_quit_)
+ return;
+
+ dispatching_ = true;
+ while (!incoming_messages_.empty()) {
+ if (block_on_event_ &&
+ GetEventHeader(*incoming_messages_.front())->type ==
+ blocked_event_type_) {
+ blocked_ = true;
+ // Go idle if we hit a blocked event type.
+ break;
+ } else {
+ blocked_ = false;
+ }
+ ScopedMessage message = std::move(incoming_messages_.front());
+ incoming_messages_.pop();
+
+ // NOTE: AcceptMessage() can re-enter this object to call any of the
+ // NodeDelegate interface methods.
+ base::AutoUnlock unlock(lock_);
+ node_.AcceptMessage(std::move(message));
+ }
+
+ dispatching_ = false;
+ started_ = true;
+ idle_event_.Signal();
+ };
}
+ const NodeName node_name_;
+ Node node_;
+ MessageRouter* router_ = nullptr;
+
+ base::Thread node_thread_;
+ base::WaitableEvent messages_available_event_;
+ base::WaitableEvent idle_event_;
+
+ // Guards fields below.
+ base::Lock lock_;
+ bool started_ = false;
+ bool dispatching_ = false;
+ bool should_quit_ = false;
+ bool drop_messages_ = false;
+ bool save_messages_ = false;
+ bool blocked_ = false;
+ bool block_on_event_ = false;
+ EventType blocked_event_type_;
+ std::queue<ScopedMessage> incoming_messages_;
std::queue<ScopedMessage> saved_messages_;
- NodeName node_name_;
- bool drop_messages_;
- bool read_messages_;
- bool save_messages_;
};
-class PortsTest : public testing::Test {
+class PortsTest : public testing::Test, public MessageRouter {
public:
- void SetUp() override {
- DiscardPendingTasks();
- SetNode(NodeName(0, 1), nullptr);
- SetNode(NodeName(1, 1), nullptr);
- SetNode(NodeName(2, 1), nullptr);
+ void AddNode(TestNode* node) {
+ {
+ base::AutoLock lock(lock_);
+ nodes_[node->name()] = node;
+ }
+ node->Start(this);
+ }
+
+ void RemoveNode(TestNode* node) {
+ {
+ base::AutoLock lock(lock_);
+ nodes_.erase(node->name());
+ }
+
+ for (const auto& entry : nodes_)
+ entry.second->node().LostConnectionToNode(node->name());
+ }
+
+ // Waits until all known Nodes are idle. Message forwarding and processing
+ // is handled in such a way that idleness is a stable state: once all nodes in
+ // the system are idle, they will remain idle until the test explicitly
+ // initiates some further event (e.g. sending a message, closing a port, or
+ // removing a Node).
+ void WaitForIdle() {
+ for (;;) {
+ base::AutoLock global_lock(global_lock_);
+ bool all_nodes_idle = true;
+ for (const auto& entry : nodes_) {
+ if (!entry.second->IsIdle())
+ all_nodes_idle = false;
+ entry.second->WakeUp();
+ }
+ if (all_nodes_idle)
+ return;
+
+ // Wait for any Node to signal that it's idle.
+ base::AutoUnlock global_unlock(global_lock_);
+ std::vector<base::WaitableEvent*> events;
+ for (const auto& entry : nodes_)
+ events.push_back(&entry.second->idle_event());
+ base::WaitableEvent::WaitMany(events.data(), events.size());
+ }
}
+
+ void CreatePortPair(TestNode* node0,
+ PortRef* port0,
+ TestNode* node1,
+ PortRef* port1) {
+ if (node0 == node1) {
+ EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
+ } else {
+ EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
+ EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
+ EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
+ port1->name()));
+ EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
+ port0->name()));
+ }
+ }
+
+ private:
+ // MessageRouter:
+ void GeneratePortName(PortName* name) override {
+ base::AutoLock lock(lock_);
+ name->v1 = next_port_id_++;
+ name->v2 = 0;
+ }
+
+ void ForwardMessage(TestNode* from_node,
+ const NodeName& node_name,
+ ScopedMessage message) override {
+ base::AutoLock global_lock(global_lock_);
+ base::AutoLock lock(lock_);
+ // Drop messages from nodes that have been removed.
+ if (nodes_.find(from_node->name()) == nodes_.end()) {
+ from_node->ClosePortsInMessage(message.get());
+ return;
+ }
+
+ auto it = nodes_.find(node_name);
+ if (it == nodes_.end()) {
+ DVLOG(1) << "Node not found: " << node_name;
+ return;
+ }
+
+ it->second->EnqueueMessage(std::move(message));
+ }
+
+ void BroadcastMessage(TestNode* from_node, ScopedMessage message) override {
+ base::AutoLock global_lock(global_lock_);
+ base::AutoLock lock(lock_);
+
+ // Drop messages from nodes that have been removed.
+ if (nodes_.find(from_node->name()) == nodes_.end())
+ return;
+
+ for (const auto& entry : nodes_) {
+ TestNode* node = entry.second;
+ // Broadcast doesn't deliver to the local node.
+ if (node == from_node)
+ continue;
+
+ // NOTE: We only need to support broadcast of events. Events have no
+ // payload or ports bytes.
+ ScopedMessage new_message(
+ new TestMessage(message->num_header_bytes(), 0, 0));
+ memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
+ message->num_header_bytes());
+ node->EnqueueMessage(std::move(new_message));
+ }
+ }
+
+ base::MessageLoop message_loop_;
+
+ // Acquired before any operation which makes a Node busy, and before testing
+ // if all nodes are idle.
+ base::Lock global_lock_;
+
+ base::Lock lock_;
+ uint64_t next_port_id_ = 1;
+ std::map<NodeName, TestNode*> nodes_;
};
} // namespace
TEST_F(PortsTest, Basic1) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- SetNode(node0_name, &node0);
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- SetNode(node1_name, &node1);
+ TestNode node1(1);
+ AddNode(&node1);
- // Setup pipe between node0 and node1.
PortRef x0, x1;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
- EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
- EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
+ CreatePortPair(&node0, &x0, &node1, &x1);
- // Transfer a port from node0 to node1.
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
-
- EXPECT_EQ(OK, node0.ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
- EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_EQ(OK, node1.ClosePort(x1));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Basic2) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- SetNode(node0_name, &node0);
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- SetNode(node1_name, &node1);
+ TestNode node1(1);
+ AddNode(&node1);
- // Setup pipe between node0 and node1.
PortRef x0, x1;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
- EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
- EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
+ CreatePortPair(&node0, &x0, &node1, &x1);
PortRef b0, b1;
- EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
- EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
+ EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
- // This may cause a SendMessage(b1) failure.
- EXPECT_EQ(OK, node0.ClosePort(b0));
+ EXPECT_EQ(OK, node0.node().ClosePort(b0));
- EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_EQ(OK, node1.ClosePort(x1));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Basic3) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- SetNode(node0_name, &node0);
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- SetNode(node1_name, &node1);
+ TestNode node1(1);
+ AddNode(&node1);
- // Setup pipe between node0 and node1.
PortRef x0, x1;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
- EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
- EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
+ CreatePortPair(&node0, &x0, &node1, &x1);
- // Transfer a port from node0 to node1.
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
- EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
- // Transfer a0 as well.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
+ EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
+
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
PortRef b0, b1;
- EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
- EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
+ EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
- // This may cause a SendMessage(b1) failure.
- EXPECT_EQ(OK, node0.ClosePort(b0));
+ EXPECT_EQ(OK, node0.node().ClosePort(b0));
- EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_EQ(OK, node1.ClosePort(x1));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNode1) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- SetNode(node0_name, &node0);
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- SetNode(node1_name, &node1);
+ TestNode node1(1);
+ AddNode(&node1);
+ node1.set_drop_messages(true);
- // Setup pipe between node0 and node1.
PortRef x0, x1;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
- EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
- EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
-
- // Transfer port to node1 and simulate a lost connection to node1. Dropping
- // events from node1 is how we simulate the lost connection.
+ CreatePortPair(&node0, &x0, &node1, &x1);
- node1_delegate.set_drop_messages(true);
+ // Transfer a port to node1 and simulate a lost connection to node1.
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
+ RemoveNode(&node1);
- PumpTasks();
+ WaitForIdle();
- EXPECT_EQ(OK, node0.ClosePort(a0));
- EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_EQ(OK, node1.ClosePort(x1));
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNode2) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
- // Setup pipe between node0 and node1.
PortRef x0, x1;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
- EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
- EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
-
- node1_delegate.set_read_messages(false);
+ CreatePortPair(&node0, &x0, &node1, &x1);
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
- PumpTasks();
+ WaitForIdle();
- node1_delegate.set_drop_messages(true);
+ node1.set_drop_messages(true);
- EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
+ RemoveNode(&node1);
- PumpTasks();
+ WaitForIdle();
+ // a0 should have eventually detected peer closure after node loss.
ScopedMessage message;
- EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
+ EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message));
EXPECT_FALSE(message);
- EXPECT_EQ(OK, node0.ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
- EXPECT_EQ(OK, node0.ClosePort(x0));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
- EXPECT_EQ(OK, node1.GetMessage(x1, &message));
+ EXPECT_EQ(OK, node1.node().GetMessage(x1, &message));
EXPECT_TRUE(message);
- ClosePortsInMessage(&node1, message.get());
+ node1.ClosePortsInMessage(message.get());
- EXPECT_EQ(OK, node1.ClosePort(x1));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
// Tests that a proxy gets cleaned up when its indirect peer lives on a lost
// node.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
- NodeName node2_name(2, 1);
- TestNodeDelegate node2_delegate(node2_name);
- Node node2(node2_name, &node2_delegate);
- node_map[2] = &node2;
-
- node1_delegate.set_save_messages(true);
+ TestNode node2(2);
+ AddNode(&node2);
// Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
- EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
- EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
- EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
- EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
- EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));
+ CreatePortPair(&node0, &A, &node1, &B);
+ CreatePortPair(&node1, &C, &node2, &D);
// Create E-F and send F over A to node 1.
PortRef E, F;
- EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
- PumpTasks();
+ WaitForIdle();
ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ ASSERT_TRUE(node1.ReadMessage(B, &message));
ASSERT_EQ(1u, message->num_ports());
- EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F));
+ EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
// Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
// will trivially become aware of the loss, and this test verifies that the
// port A on node 0 will eventually also become aware of it.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F));
+ // Make sure node2 stops processing events when it encounters an ObserveProxy.
+ node2.BlockOnEvent(EventType::kObserveProxy);
+
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
+ WaitForIdle();
- node_map[2] = nullptr;
- EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name));
+ // Simulate node 1 and 2 disconnecting.
+ EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
- PumpTasks();
+ // Let node2 continue processing events and wait for everyone to go idle.
+ node2.Unblock();
+ WaitForIdle();
// Port F should be gone.
- EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F));
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
// Port E should have detected peer closure despite the fact that there is
// no longer a continuous route from F to E over which the event could travel.
PortStatus status;
- EXPECT_EQ(OK, node0.GetStatus(E, &status));
+ EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
EXPECT_TRUE(status.peer_closed);
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(B));
- EXPECT_EQ(OK, node1.ClosePort(C));
- EXPECT_EQ(OK, node0.ClosePort(E));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(B));
+ EXPECT_EQ(OK, node1.node().ClosePort(C));
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
+
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
// Tests that a proxy gets cleaned up when its direct peer lives on a lost
// node and it's predecessor lives on the same node.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
- node1_delegate.set_save_messages(true);
-
- // Create A-B spanning nodes 0 and 1.
PortRef A, B;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
- EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
- EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
+ CreatePortPair(&node0, &A, &node1, &B);
- // Create C-D and send D over A to node 1.
PortRef C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
+
+ // Send D but block node0 on an ObserveProxy event.
+ node0.BlockOnEvent(EventType::kObserveProxy);
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
- // Pump tasks until the start of port collapse for port D, which should become
- // a proxy.
- PumpUntilTask(EventType::kObserveProxy);
+ // node0 won't collapse the proxy but node1 will receive the message before
+ // going idle.
+ WaitForIdle();
ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ ASSERT_TRUE(node1.ReadMessage(B, &message));
ASSERT_EQ(1u, message->num_ports());
-
PortRef E;
- EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E));
+ EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
- EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
- PumpTasks();
+ RemoveNode(&node1);
+
+ node0.Unblock();
+ WaitForIdle();
// Port C should have detected peer closure.
PortStatus status;
- EXPECT_EQ(OK, node0.GetStatus(C, &status));
+ EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
EXPECT_TRUE(status.peer_closed);
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(B));
- EXPECT_EQ(OK, node0.ClosePort(C));
- EXPECT_EQ(OK, node1.ClosePort(E));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(B));
+ EXPECT_EQ(OK, node0.node().ClosePort(C));
+ EXPECT_EQ(OK, node1.node().ClosePort(E));
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, GetMessage1) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node(0);
+ AddNode(&node);
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
ScopedMessage message;
- EXPECT_EQ(OK, node0.GetMessage(a0, &message));
+ EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
EXPECT_FALSE(message);
- EXPECT_EQ(OK, node0.ClosePort(a1));
-
- EXPECT_EQ(OK, node0.GetMessage(a0, &message));
- EXPECT_FALSE(message);
+ EXPECT_EQ(OK, node.node().ClosePort(a1));
- PumpTasks();
+ WaitForIdle();
- EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
+ EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message));
EXPECT_FALSE(message);
- EXPECT_EQ(OK, node0.ClosePort(a0));
+ EXPECT_EQ(OK, node.node().ClosePort(a0));
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ WaitForIdle();
+
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, GetMessage2) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_read_messages(false);
+ TestNode node(0);
+ AddNode(&node);
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
- EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
+ EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
ScopedMessage message;
- EXPECT_EQ(OK, node0.GetMessage(a0, &message));
+ EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
ASSERT_TRUE(message);
- EXPECT_EQ(0, strcmp("1", ToString(message)));
+ EXPECT_TRUE(MessageEquals(message, "1"));
- EXPECT_EQ(OK, node0.ClosePort(a0));
- EXPECT_EQ(OK, node0.ClosePort(a1));
+ EXPECT_EQ(OK, node.node().ClosePort(a0));
+ EXPECT_EQ(OK, node.node().ClosePort(a1));
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, GetMessage3) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_read_messages(false);
+ TestNode node(0);
+ AddNode(&node);
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
const char* kStrings[] = {
"1",
@@ -696,371 +777,305 @@ TEST_F(PortsTest, GetMessage3) {
};
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
- EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));
+ EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
ScopedMessage message;
for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
- EXPECT_EQ(OK, node0.GetMessage(a0, &message));
+ EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
ASSERT_TRUE(message);
- EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
- DVLOG(1) << "got " << kStrings[i];
+ EXPECT_TRUE(MessageEquals(message, kStrings[i]));
}
- EXPECT_EQ(OK, node0.ClosePort(a0));
- EXPECT_EQ(OK, node0.ClosePort(a1));
+ EXPECT_EQ(OK, node.node().ClosePort(a0));
+ EXPECT_EQ(OK, node.node().ClosePort(a1));
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Delegation1) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- SetNode(node0_name, &node0);
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
- node0_delegate.set_save_messages(true);
- node1_delegate.set_save_messages(true);
-
- // Setup pipe between node0 and node1.
PortRef x0, x1;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
- EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
- EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
+ CreatePortPair(&node0, &x0, &node1, &x1);
// In this test, we send a message to a port that has been moved.
PortRef a0, a1;
- EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
-
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));
-
- PumpTasks();
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
+ WaitForIdle();
ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
-
+ ASSERT_TRUE(node1.ReadMessage(x1, &message));
ASSERT_EQ(1u, message->num_ports());
+ EXPECT_TRUE(MessageEquals(message, "a1"));
// This is "a1" from the point of view of node1.
PortName a2_name = message->ports()[0];
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
+ EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));
-
- PumpTasks();
-
- EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));
-
- PumpTasks();
-
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ WaitForIdle();
+ ASSERT_TRUE(node0.ReadMessage(x0, &message));
ASSERT_EQ(1u, message->num_ports());
+ EXPECT_TRUE(MessageEquals(message, "a2"));
// This is "a2" from the point of view of node1.
PortName a3_name = message->ports()[0];
PortRef a3;
- EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));
-
- EXPECT_EQ(0, strcmp("a2", ToString(message)));
-
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
+ ASSERT_TRUE(node0.ReadMessage(a3, &message));
EXPECT_EQ(0u, message->num_ports());
- EXPECT_EQ(0, strcmp("hello", ToString(message)));
+ EXPECT_TRUE(MessageEquals(message, "hello"));
- EXPECT_EQ(OK, node0.ClosePort(a0));
- EXPECT_EQ(OK, node0.ClosePort(a3));
+ EXPECT_EQ(OK, node0.node().ClosePort(a0));
+ EXPECT_EQ(OK, node0.node().ClosePort(a3));
- EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_EQ(OK, node1.ClosePort(x1));
+ EXPECT_EQ(OK, node0.node().ClosePort(x0));
+ EXPECT_EQ(OK, node1.node().ClosePort(x1));
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, Delegation2) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- SetNode(node0_name, &node0);
-
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node0(0);
+ AddNode(&node0);
- node0_delegate.set_save_messages(true);
- node1_delegate.set_save_messages(true);
+ TestNode node1(1);
+ AddNode(&node1);
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 100; ++i) {
// Setup pipe a<->b between node0 and node1.
PortRef A, B;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
- EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
- EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
- EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
+ CreatePortPair(&node0, &A, &node1, &B);
PortRef C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
PortRef E, F;
- EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
+
+ node1.set_save_messages(true);
// Pass D over A to B.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
// Pass F over C to D.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
// This message should find its way to node1.
- EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));
+ EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
- PumpTasks();
+ WaitForIdle();
- EXPECT_EQ(OK, node0.ClosePort(C));
- EXPECT_EQ(OK, node0.ClosePort(E));
+ EXPECT_EQ(OK, node0.node().ClosePort(C));
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(B));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(B));
- for (;;) {
- ScopedMessage message;
- if (node1_delegate.GetSavedMessage(&message)) {
- ClosePortsInMessage(&node1, message.get());
- if (strcmp("hello", ToString(message)) == 0)
- break;
- } else {
- ASSERT_TRUE(false); // "hello" message not delivered!
+ bool got_hello = false;
+ ScopedMessage message;
+ while (node1.GetSavedMessage(&message)) {
+ node1.ClosePortsInMessage(message.get());
+ if (MessageEquals(message, "hello")) {
+ got_hello = true;
break;
}
}
- PumpTasks(); // Because ClosePort may have generated tasks.
+ EXPECT_TRUE(got_hello);
+
+ WaitForIdle(); // Because closing ports may have generated tasks.
}
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendUninitialized) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node(0);
+ AddNode(&node);
PortRef x0;
- EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
- EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
- SendStringMessage(&node0, x0, "oops"));
- EXPECT_EQ(OK, node0.ClosePort(x0));
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
+ EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
+ EXPECT_EQ(OK, node.node().ClosePort(x0));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendFailure) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node(0);
+ AddNode(&node);
- node0_delegate.set_save_messages(true);
+ node.set_save_messages(true);
PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
// Try to send A over itself.
EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
- SendStringMessageWithPort(&node0, A, "oops", A));
+ node.SendStringMessageWithPort(A, "oops", A));
// Try to send B over A.
EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
- SendStringMessageWithPort(&node0, A, "nope", B));
+ node.SendStringMessageWithPort(A, "nope", B));
// B should be closed immediately.
- EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
- PumpTasks();
+ WaitForIdle();
// There should have been no messages accepted.
ScopedMessage message;
- EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_FALSE(node.GetSavedMessage(&message));
- EXPECT_EQ(OK, node0.ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, DontLeakUnreceivedPorts) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_read_messages(false);
+ TestNode node(0);
+ AddNode(&node);
- PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
-
- PortRef C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
-
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
-
- PumpTasks();
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
- EXPECT_EQ(OK, node0.ClosePort(C));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node0.ClosePort(B));
+ EXPECT_EQ(OK, node.node().ClosePort(C));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(B));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node(0);
+ AddNode(&node);
- node0_delegate.set_save_messages(true);
-
- PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
-
- PortRef C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
+ PortRef A, B, C, D;
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
ScopedMessage message;
- EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_TRUE(node.ReadMessage(B, &message));
ASSERT_EQ(1u, message->num_ports());
-
+ EXPECT_TRUE(MessageEquals(message, "foo"));
PortRef E;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
- EXPECT_TRUE(node0.CanShutdownCleanly(true));
+ EXPECT_TRUE(
+ node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(true));
- EXPECT_FALSE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(
+ node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+ EXPECT_FALSE(node.node().CanShutdownCleanly());
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node0.ClosePort(B));
- EXPECT_EQ(OK, node0.ClosePort(C));
- EXPECT_EQ(OK, node0.ClosePort(E));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(B));
+ EXPECT_EQ(OK, node.node().ClosePort(C));
+ EXPECT_EQ(OK, node.node().ClosePort(E));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, ProxyCollapse1) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_save_messages(true);
+ TestNode node(0);
+ AddNode(&node);
PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
PortRef X, Y;
- EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
ScopedMessage message;
// Send B and receive it as C.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
// Send C and receive it as D.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
// Send D and receive it as E.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
- EXPECT_EQ(OK, node0.ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(Y));
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node0.ClosePort(E));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
+ EXPECT_EQ(OK, node.node().ClosePort(E));
- PumpTasks();
+ // The node should not idle until all proxies are collapsed.
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, ProxyCollapse2) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_save_messages(true);
+ TestNode node(0);
+ AddNode(&node);
PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
PortRef X, Y;
- EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
ScopedMessage message;
- // Send B and receive it as C.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- ASSERT_EQ(1u, message->num_ports());
- PortRef C;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
+ // Send B and A to create proxies in each direction.
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
- // Send A and receive it as D.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- ASSERT_EQ(1u, message->num_ports());
- PortRef D;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
// At this point we have a scenario with:
//
// D -> [B] -> C -> [A]
//
- // Ensure that the proxies can collapse.
-
- EXPECT_EQ(OK, node0.ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(Y));
+ // Ensure that the proxies can collapse. The sent ports will be closed
+ // eventually as a result of Y's closure.
- EXPECT_EQ(OK, node0.ClosePort(C));
- EXPECT_EQ(OK, node0.ClosePort(D));
+ WaitForIdle();
- PumpTasks();
-
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendWithClosedPeer) {
@@ -1068,56 +1083,47 @@ TEST_F(PortsTest, SendWithClosedPeer) {
// closed, the newly created port will be aware of that peer closure, and the
// proxy will eventually collapse.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_read_messages(false);
+ TestNode node(0);
+ AddNode(&node);
// Send a message from A to B, then close A.
PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
- EXPECT_EQ(OK, node0.ClosePort(A));
-
- PumpTasks();
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
+ EXPECT_EQ(OK, node.node().ClosePort(A));
// Now send B over X-Y as new port C.
PortRef X, Y;
- EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
-
- node0_delegate.set_read_messages(true);
- node0_delegate.set_save_messages(true);
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
-
- EXPECT_EQ(OK, node0.ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(Y));
-
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
ScopedMessage message;
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
-
PortRef C;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
+
+ EXPECT_EQ(OK, node.node().ClosePort(X));
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
- PumpTasks();
+ WaitForIdle();
- // C should receive the message originally sent to B, and it should also be
- // aware of A's closure.
+ // C should have received the message originally sent to B, and it should also
+ // be aware of A's closure.
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
+ ASSERT_TRUE(node.ReadMessage(C, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
PortStatus status;
- EXPECT_EQ(OK, node0.GetStatus(C, &status));
+ EXPECT_EQ(OK, node.node().GetStatus(C, &status));
EXPECT_FALSE(status.receiving_messages);
EXPECT_FALSE(status.has_messages);
EXPECT_TRUE(status.peer_closed);
- node0.ClosePort(C);
+ node.node().ClosePort(C);
+
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, SendWithClosedPeerSent) {
@@ -1126,391 +1132,342 @@ TEST_F(PortsTest, SendWithClosedPeerSent) {
// eventually notified of the closure, and the dead-end proxies will
// eventually be removed.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
-
- node0_delegate.set_save_messages(true);
+ TestNode node(0);
+ AddNode(&node);
PortRef X, Y;
- EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
PortRef A, B;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
ScopedMessage message;
// Send A as new port C.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
+
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef C;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
// Send C as new port D.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
+
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef D;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
-
- node0_delegate.set_read_messages(false);
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
// Send a message to B through D, then close D.
- EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
- EXPECT_EQ(OK, node0.ClosePort(D));
-
- PumpTasks();
+ EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
+ EXPECT_EQ(OK, node.node().ClosePort(D));
// Now send B as new port E.
- node0_delegate.set_read_messages(true);
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
+ EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
+ EXPECT_EQ(OK, node.node().ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(Y));
-
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ ASSERT_TRUE(node.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
-
PortRef E;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
+ ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
+
+ EXPECT_EQ(OK, node.node().ClosePort(Y));
- PumpTasks();
+ WaitForIdle();
// E should receive the message originally sent to B, and it should also be
// aware of D's closure.
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
+ ASSERT_TRUE(node.ReadMessage(E, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
PortStatus status;
- EXPECT_EQ(OK, node0.GetStatus(E, &status));
+ EXPECT_EQ(OK, node.node().GetStatus(E, &status));
EXPECT_FALSE(status.receiving_messages);
EXPECT_FALSE(status.has_messages);
EXPECT_TRUE(status.peer_closed);
- node0.ClosePort(E);
+ EXPECT_EQ(OK, node.node().ClosePort(E));
- PumpTasks();
+ WaitForIdle();
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
+ EXPECT_TRUE(node.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePorts) {
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_read_messages(false);
- node1_delegate.set_save_messages(true);
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on A.
- EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
-
- PumpTasks();
+ EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
// Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
- PumpTasks();
+ WaitForIdle();
- // Expect only two receiving ports to be left after pumping tasks.
- EXPECT_TRUE(node0.CanShutdownCleanly(true));
- EXPECT_TRUE(node1.CanShutdownCleanly(true));
+ // Expect all proxies to be gone once idle.
+ EXPECT_TRUE(
+ node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+ EXPECT_TRUE(
+ node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
// Expect D to have received the message sent on A.
ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
+ ASSERT_TRUE(node1.ReadMessage(D, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(D));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
// No more ports should be open.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortWithClosedPeer1) {
// This tests that the right thing happens when initiating a merge on a port
// whose peer has already been closed.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_read_messages(false);
- node1_delegate.set_save_messages(true);
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Write a message on A.
- EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
-
- PumpTasks();
+ EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
// Close A.
- EXPECT_EQ(OK, node0.ClosePort(A));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
// Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
- PumpTasks();
+ WaitForIdle();
- // Expect only one receiving port to be left after pumping tasks.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(true));
+ // Expect all proxies to be gone once idle. node0 should have no ports since
+ // A was explicitly closed.
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(
+ node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
// Expect D to have received the message sent on A.
ScopedMessage message;
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
+ ASSERT_TRUE(node1.ReadMessage(D, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
- EXPECT_EQ(OK, node1.ClosePort(D));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
// No more ports should be open.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortWithClosedPeer2) {
// This tests that the right thing happens when merging into a port whose peer
// has already been closed.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_read_messages(false);
-
- // Write a message on D.
- EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
- PumpTasks();
-
- // Close D.
- EXPECT_EQ(OK, node1.ClosePort(D));
+ // Write a message on D and close it.
+ EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
// Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
- PumpTasks();
+ WaitForIdle();
- // Expect only one receiving port to be left after pumping tasks.
- EXPECT_TRUE(node0.CanShutdownCleanly(true));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ // Expect all proxies to be gone once idle. node1 should have no ports since
+ // D was explicitly closed.
+ EXPECT_TRUE(
+ node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
// Expect A to have received the message sent on D.
ScopedMessage message;
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
+ ASSERT_TRUE(node0.ReadMessage(A, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
- EXPECT_EQ(OK, node0.ClosePort(A));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
// No more ports should be open.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsWithClosedPeers) {
// This tests that no residual ports are left behind if two ports are merged
// when both of their peers have been closed.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_read_messages(false);
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Close A and D.
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(D));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
- PumpTasks();
+ WaitForIdle();
// Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
- PumpTasks();
+ WaitForIdle();
// Expect everything to have gone away.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsWithMovedPeers) {
- // This tests that no ports can be merged successfully even if their peers
- // are moved around.
+ // This tests that ports can be merged successfully even if their peers are
+ // moved around.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
-
- node0_delegate.set_save_messages(true);
- node1_delegate.set_read_messages(false);
+ TestNode node1(1);
+ AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
// Set up another pair X-Y for moving ports on node0.
PortRef X, Y;
- EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
ScopedMessage message;
// Move A to new port E.
- EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
+ ASSERT_TRUE(node0.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
- ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
-
- EXPECT_EQ(OK, node0.ClosePort(X));
- EXPECT_EQ(OK, node0.ClosePort(Y));
+ ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
- node0_delegate.set_read_messages(false);
+ EXPECT_EQ(OK, node0.node().ClosePort(X));
+ EXPECT_EQ(OK, node0.node().ClosePort(Y));
// Write messages on E and D.
- EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
- EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
+ EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
+ EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
// Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
-
- node0_delegate.set_read_messages(true);
- node1_delegate.set_read_messages(true);
- node1_delegate.set_save_messages(true);
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
- PumpTasks();
+ WaitForIdle();
// Expect to receive D's message on E and E's message on D.
- ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hi", ToString(message)));
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
- EXPECT_EQ(0, strcmp("hey", ToString(message)));
+ ASSERT_TRUE(node0.ReadMessage(E, &message));
+ EXPECT_TRUE(MessageEquals(message, "hi"));
+ ASSERT_TRUE(node1.ReadMessage(D, &message));
+ EXPECT_TRUE(MessageEquals(message, "hey"));
// Close E and D.
- EXPECT_EQ(OK, node0.ClosePort(E));
- EXPECT_EQ(OK, node1.ClosePort(D));
+ EXPECT_EQ(OK, node0.node().ClosePort(E));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
- PumpTasks();
+ WaitForIdle();
// Expect everything to have gone away.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
TEST_F(PortsTest, MergePortsFailsGracefully) {
// This tests that the system remains in a well-defined state if something
// goes wrong during port merge.
- NodeName node0_name(0, 1);
- TestNodeDelegate node0_delegate(node0_name);
- Node node0(node0_name, &node0_delegate);
- node_map[0] = &node0;
+ TestNode node0(0);
+ AddNode(&node0);
- NodeName node1_name(1, 1);
- TestNodeDelegate node1_delegate(node1_name);
- Node node1(node1_name, &node1_delegate);
- node_map[1] = &node1;
+ TestNode node1(1);
+ AddNode(&node1);
// Setup two independent port pairs, A-B on node0 and C-D on node1.
PortRef A, B, C, D;
- EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
- EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
-
- PumpTasks();
-
- // Initiate a merge between B and C.
- EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
+ EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
- // Move C to a new port E. This is dumb and nobody should do it, but it's
- // possible. MergePorts will fail as a result because C won't be in a
- // receiving state when the event arrives at node1, so B should be closed.
ScopedMessage message;
PortRef X, Y;
- EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
- node1_delegate.set_save_messages(true);
- EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
- ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
+ EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y));
+
+ // Block the merge from proceeding until we can do something stupid with port
+ // C. This avoids the test logic racing with async merge logic.
+ node1.BlockOnEvent(EventType::kMergePort);
+
+ // Initiate the merge between B and C.
+ EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
+
+ // Move C to a new port E. This is not a sane use of Node's public API but
+ // is still hypothetically possible. It allows us to force a merge failure
+ // because C will be in an invalid state by the term the merge is processed.
+ // As a result, B should be closed.
+ EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C));
+
+ node1.Unblock();
+
+ ASSERT_TRUE(node1.ReadMessage(Y, &message));
ASSERT_EQ(1u, message->num_ports());
PortRef E;
- ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
- EXPECT_EQ(OK, node1.ClosePort(X));
- EXPECT_EQ(OK, node1.ClosePort(Y));
+ ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
- // C goes away as a result of normal proxy removal.
- PumpTasks();
+ EXPECT_EQ(OK, node1.node().ClosePort(X));
+ EXPECT_EQ(OK, node1.node().ClosePort(Y));
- EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
+ WaitForIdle();
- // B should have been closed cleanly.
- EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
+ // C goes away as a result of normal proxy removal. B should have been closed
+ // cleanly by the failed MergePorts.
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
+ EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
// Close A, D, and E.
- EXPECT_EQ(OK, node0.ClosePort(A));
- EXPECT_EQ(OK, node1.ClosePort(D));
- EXPECT_EQ(OK, node1.ClosePort(E));
+ EXPECT_EQ(OK, node0.node().ClosePort(A));
+ EXPECT_EQ(OK, node1.node().ClosePort(D));
+ EXPECT_EQ(OK, node1.node().ClosePort(E));
- PumpTasks();
+ WaitForIdle();
// Expect everything to have gone away.
- EXPECT_TRUE(node0.CanShutdownCleanly(false));
- EXPECT_TRUE(node1.CanShutdownCleanly(false));
+ EXPECT_TRUE(node0.node().CanShutdownCleanly());
+ EXPECT_TRUE(node1.node().CanShutdownCleanly());
}
} // namespace test