diff options
Diffstat (limited to 'chromium/mojo/edk')
73 files changed, 1504 insertions, 1264 deletions
diff --git a/chromium/mojo/edk/embedder/BUILD.gn b/chromium/mojo/edk/embedder/BUILD.gn index cca7230c9c1..02fc32cab3a 100644 --- a/chromium/mojo/edk/embedder/BUILD.gn +++ b/chromium/mojo/edk/embedder/BUILD.gn @@ -145,7 +145,6 @@ source_set("embedder_unittests") { "//mojo/edk/system", "//mojo/edk/system:test_utils", "//mojo/edk/test:test_support", - "//mojo/message_pump", "//testing/gtest", ] } diff --git a/chromium/mojo/edk/embedder/embedder.cc b/chromium/mojo/edk/embedder/embedder.cc index 793b7083f08..38c789cd7c4 100644 --- a/chromium/mojo/edk/embedder/embedder.cc +++ b/chromium/mojo/edk/embedder/embedder.cc @@ -12,7 +12,7 @@ #include "base/memory/ref_counted.h" #include "base/strings/string_number_conversions.h" #include "base/task_runner.h" -#include "base/thread_task_runner_handle.h" +#include "base/threading/thread_task_runner_handle.h" #include "crypto/random.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/embedder/platform_channel_pair.h" diff --git a/chromium/mojo/edk/embedder/embedder.h b/chromium/mojo/edk/embedder/embedder.h index 6a9b30525c3..edb6c3212de 100644 --- a/chromium/mojo/edk/embedder/embedder.h +++ b/chromium/mojo/edk/embedder/embedder.h @@ -7,12 +7,12 @@ #include <stddef.h> +#include <memory> #include <string> #include "base/callback.h" #include "base/command_line.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/memory/shared_memory_handle.h" #include "base/process/process_handle.h" #include "base/task_runner.h" diff --git a/chromium/mojo/edk/embedder/embedder_unittest.cc b/chromium/mojo/edk/embedder/embedder_unittest.cc index c781f42ed1a..0904799ac44 100644 --- a/chromium/mojo/edk/embedder/embedder_unittest.cc +++ b/chromium/mojo/edk/embedder/embedder_unittest.cc @@ -12,6 +12,7 @@ #include "base/bind.h" #include "base/command_line.h" +#include "base/files/file.h" #include "base/logging.h" #include "base/macros.h" #include "base/memory/shared_memory.h" @@ -22,7 +23,6 @@ #include "mojo/edk/embedder/test_embedder.h" #include "mojo/edk/system/test_utils.h" #include "mojo/edk/test/mojo_test_base.h" -#include "mojo/message_pump/message_pump_mojo.h" #include "mojo/public/c/system/core.h" #include "mojo/public/cpp/system/handle.h" #include "mojo/public/cpp/system/message_pipe.h" @@ -198,13 +198,7 @@ TEST_F(EmbedderTest, ChannelsHandlePassing) { #if !defined(OS_IOS) -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_MultiprocessChannels DISABLED_MultiprocessChannels -#else -#define MAYBE_MultiprocessChannels MultiprocessChannels -#endif // defined(OS_ANDROID) -TEST_F(EmbedderTest, MAYBE_MultiprocessChannels) { +TEST_F(EmbedderTest, MultiprocessChannels) { RUN_CHILD_ON_PIPE(MultiprocessChannelsClient, server_mp) // 1. Write a message to |server_mp| (attaching nothing). WriteMessage(server_mp, "hello"); @@ -290,21 +284,12 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessChannelsClient, EmbedderTest, ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp1)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_MultiprocessBaseSharedMemory DISABLED_MultiprocessBaseSharedMemory -#else -#define MAYBE_MultiprocessBaseSharedMemory MultiprocessBaseSharedMemory -#endif // defined(OS_ANDROID) -TEST_F(EmbedderTest, MAYBE_MultiprocessBaseSharedMemory) { +TEST_F(EmbedderTest, MultiprocessBaseSharedMemory) { RUN_CHILD_ON_PIPE(MultiprocessSharedMemoryClient, server_mp) // 1. Create a base::SharedMemory object and create a mojo shared buffer // from it. base::SharedMemoryCreateOptions options; options.size = 123; -#if defined(OS_MACOSX) && !defined(OS_IOS) - options.type = base::SharedMemoryHandle::POSIX; -#endif base::SharedMemory shared_memory; ASSERT_TRUE(shared_memory.Create(options)); base::SharedMemoryHandle shm_handle = base::SharedMemory::DuplicateHandle( @@ -384,7 +369,6 @@ TEST_F(EmbedderTest, MultiprocessMachSharedMemory) { // buffer from it. base::SharedMemoryCreateOptions options; options.size = 123; - options.type = base::SharedMemoryHandle::MACH; base::SharedMemory shared_memory; ASSERT_TRUE(shared_memory.Create(options)); base::SharedMemoryHandle shm_handle = base::SharedMemory::DuplicateHandle( @@ -422,41 +406,52 @@ TEST_F(EmbedderTest, MultiprocessMachSharedMemory) { END_CHILD() } -const base::SharedMemoryHandle::Type kTestHandleTypes[] = { - base::SharedMemoryHandle::MACH, - base::SharedMemoryHandle::POSIX, - base::SharedMemoryHandle::POSIX, - base::SharedMemoryHandle::MACH, +enum class HandleType { + POSIX, + MACH, + MACH_NULL, +}; + +const HandleType kTestHandleTypes[] = { + HandleType::MACH, + HandleType::MACH_NULL, + HandleType::POSIX, + HandleType::POSIX, + HandleType::MACH, }; -// Test that we can mix file descriptor and mach port handles. +// Test that we can mix file descriptors and mach port handles. TEST_F(EmbedderTest, MultiprocessMixMachAndFds) { const size_t kShmSize = 1234; RUN_CHILD_ON_PIPE(MultiprocessMixMachAndFdsClient, server_mp) - // 1. Create the base::SharedMemory objects and mojo handles from them. + // 1. Create fds or Mach objects and mojo handles from them. MojoHandle platform_handles[arraysize(kTestHandleTypes)]; for (size_t i = 0; i < arraysize(kTestHandleTypes); i++) { const auto type = kTestHandleTypes[i]; - base::SharedMemoryCreateOptions options; - options.size = kShmSize; - options.type = type; - base::SharedMemory shared_memory; - ASSERT_TRUE(shared_memory.Create(options)); - base::SharedMemoryHandle shm_handle = base::SharedMemory::DuplicateHandle( - shared_memory.handle()); ScopedPlatformHandle scoped_handle; - if (type == base::SharedMemoryHandle::POSIX) - scoped_handle.reset(PlatformHandle(shm_handle.GetFileDescriptor().fd)); - else + if (type == HandleType::POSIX) { + // The easiest source of fds is opening /dev/null. + base::File file(base::FilePath("/dev/null"), + base::File::FLAG_OPEN | base::File::FLAG_WRITE); + ASSERT_TRUE(file.IsValid()); + scoped_handle.reset(PlatformHandle(file.TakePlatformFile())); + EXPECT_EQ(PlatformHandle::Type::POSIX, scoped_handle.get().type); + } else if (type == HandleType::MACH_NULL) { + scoped_handle.reset(PlatformHandle( + static_cast<mach_port_t>(MACH_PORT_NULL))); + EXPECT_EQ(PlatformHandle::Type::MACH, scoped_handle.get().type); + } else { + base::SharedMemoryCreateOptions options; + options.size = kShmSize; + base::SharedMemory shared_memory; + ASSERT_TRUE(shared_memory.Create(options)); + base::SharedMemoryHandle shm_handle = + base::SharedMemory::DuplicateHandle(shared_memory.handle()); scoped_handle.reset(PlatformHandle(shm_handle.GetMemoryObject())); + EXPECT_EQ(PlatformHandle::Type::MACH, scoped_handle.get().type); + } ASSERT_EQ(MOJO_RESULT_OK, CreatePlatformHandleWrapper( std::move(scoped_handle), platform_handles + i)); - - // Map the shared memory object and write the type into it. 'P' for POSIX, - // and 'M' for Mach. - ASSERT_TRUE(shared_memory.Map(kShmSize)); - static_cast<char*>(shared_memory.memory())[0] = - type == base::SharedMemoryHandle::POSIX ? 'P' : 'M'; } // 2. Send all the handles to the child. @@ -470,8 +465,7 @@ TEST_F(EmbedderTest, MultiprocessMixMachAndFds) { DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessMixMachAndFdsClient, EmbedderTest, client_mp) { - const int kNumHandles = 4; - const size_t kShmSize = 1234; + const int kNumHandles = arraysize(kTestHandleTypes); MojoHandle platform_handles[kNumHandles]; // 1. Read from |client_mp|, which should have a message containing @@ -479,29 +473,24 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessMixMachAndFdsClient, EmbedderTest, EXPECT_EQ("hello", ReadMessageWithHandles(client_mp, platform_handles, kNumHandles)); - // 2. Extract each handle, map it, and verify the type. + // 2. Extract each handle, and verify the type. for (int i = 0; i < kNumHandles; i++) { + const auto type = kTestHandleTypes[i]; ScopedPlatformHandle scoped_handle; ASSERT_EQ(MOJO_RESULT_OK, PassWrappedPlatformHandle(platform_handles[i], &scoped_handle)); - base::SharedMemoryHandle shm_handle; - char type = 0; - if (scoped_handle.get().type == PlatformHandle::Type::POSIX) { - shm_handle = base::SharedMemoryHandle(scoped_handle.release().handle, - false); - type = 'P'; + if (type == HandleType::POSIX) { + EXPECT_NE(0, scoped_handle.get().handle); + EXPECT_EQ(PlatformHandle::Type::POSIX, scoped_handle.get().type); + } else if (type == HandleType::MACH_NULL) { + EXPECT_EQ(static_cast<mach_port_t>(MACH_PORT_NULL), + scoped_handle.get().port); + EXPECT_EQ(PlatformHandle::Type::MACH, scoped_handle.get().type); } else { - shm_handle = base::SharedMemoryHandle(scoped_handle.release().port, - kShmSize, base::GetCurrentProcId()); - type = 'M'; + EXPECT_NE(static_cast<mach_port_t>(MACH_PORT_NULL), + scoped_handle.get().port); + EXPECT_EQ(PlatformHandle::Type::MACH, scoped_handle.get().type); } - - // Verify the type order. - EXPECT_EQ(kTestHandleTypes[i], shm_handle.GetType()); - - base::SharedMemory shared_memory(shm_handle, false); - ASSERT_TRUE(shared_memory.Map(kShmSize)); - EXPECT_EQ(type, static_cast<char*>(shared_memory.memory())[0]); } // 3. Say bye! diff --git a/chromium/mojo/edk/embedder/entrypoints.cc b/chromium/mojo/edk/embedder/entrypoints.cc index 6acd98571dd..fdd881a5998 100644 --- a/chromium/mojo/edk/embedder/entrypoints.cc +++ b/chromium/mojo/edk/embedder/entrypoints.cc @@ -53,6 +53,22 @@ MojoResult MojoCancelWatch(MojoHandle handle, uintptr_t context) { return g_core->CancelWatch(handle, context); } +MojoResult MojoAllocMessage(uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoAllocMessageFlags flags, + MojoMessageHandle* message) { + return g_core->AllocMessage(num_bytes, handles, num_handles, flags, message); +} + +MojoResult MojoFreeMessage(MojoMessageHandle message) { + return g_core->FreeMessage(message); +} + +MojoResult MojoGetMessageBuffer(MojoMessageHandle message, void** buffer) { + return g_core->GetMessageBuffer(message, buffer); +} + MojoResult MojoCreateWaitSet(MojoHandle* wait_set_handle) { return g_core->CreateWaitSet(wait_set_handle); } @@ -93,6 +109,12 @@ MojoResult MojoWriteMessage(MojoHandle message_pipe_handle, num_handles, flags); } +MojoResult MojoWriteMessageNew(MojoHandle message_pipe_handle, + MojoMessageHandle message, + MojoWriteMessageFlags flags) { + return g_core->WriteMessageNew(message_pipe_handle, message, flags); +} + MojoResult MojoReadMessage(MojoHandle message_pipe_handle, void* bytes, uint32_t* num_bytes, @@ -103,6 +125,16 @@ MojoResult MojoReadMessage(MojoHandle message_pipe_handle, message_pipe_handle, bytes, num_bytes, handles, num_handles, flags); } +MojoResult MojoReadMessageNew(MojoHandle message_pipe_handle, + MojoMessageHandle* message, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags) { + return g_core->ReadMessageNew( + message_pipe_handle, message, num_bytes, handles, num_handles, flags); +} + MojoResult MojoFuseMessagePipes(MojoHandle handle0, MojoHandle handle1) { return g_core->FuseMessagePipes(handle0, handle1); } diff --git a/chromium/mojo/edk/embedder/platform_channel_pair.h b/chromium/mojo/edk/embedder/platform_channel_pair.h index 591592bdb84..f80de89cedd 100644 --- a/chromium/mojo/edk/embedder/platform_channel_pair.h +++ b/chromium/mojo/edk/embedder/platform_channel_pair.h @@ -5,8 +5,9 @@ #ifndef MOJO_EDK_EMBEDDER_PLATFORM_CHANNEL_PAIR_H_ #define MOJO_EDK_EMBEDDER_PLATFORM_CHANNEL_PAIR_H_ +#include <memory> + #include "base/macros.h" -#include "base/memory/scoped_ptr.h" #include "base/process/launch.h" #include "build/build_config.h" #include "mojo/edk/embedder/scoped_platform_handle.h" diff --git a/chromium/mojo/edk/embedder/platform_channel_utils_posix.h b/chromium/mojo/edk/embedder/platform_channel_utils_posix.h index e08f2a16b1a..8b24bd028f8 100644 --- a/chromium/mojo/edk/embedder/platform_channel_utils_posix.h +++ b/chromium/mojo/edk/embedder/platform_channel_utils_posix.h @@ -9,8 +9,8 @@ #include <sys/types.h> // For |ssize_t|. #include <deque> +#include <memory> -#include "base/memory/scoped_ptr.h" #include "mojo/edk/embedder/platform_handle.h" #include "mojo/edk/system/system_impl_export.h" diff --git a/chromium/mojo/edk/embedder/platform_handle.cc b/chromium/mojo/edk/embedder/platform_handle.cc index 5709b1e4685..b6b2cd22d1f 100644 --- a/chromium/mojo/edk/embedder/platform_handle.cc +++ b/chromium/mojo/edk/embedder/platform_handle.cc @@ -36,6 +36,32 @@ void PlatformHandle::CloseIfNecessary() { } #endif // defined(OS_MACOSX) && !defined(OS_IOS) #elif defined(OS_WIN) + if (owning_process != base::GetCurrentProcessHandle()) { + // This handle may have been duplicated to a new target process but not yet + // sent there. In this case CloseHandle should NOT be called. From MSDN + // documentation for DuplicateHandle[1]: + // + // Normally the target process closes a duplicated handle when that + // process is finished using the handle. To close a duplicated handle + // from the source process, call DuplicateHandle with the following + // parameters: + // + // * Set hSourceProcessHandle to the target process from the + // call that created the handle. + // * Set hSourceHandle to the duplicated handle to close. + // * Set lpTargetHandle to NULL. + // * Set dwOptions to DUPLICATE_CLOSE_SOURCE. + // + // [1] https://msdn.microsoft.com/en-us/library/windows/desktop/ms724251 + // + // NOTE: It's possible for this operation to fail if the owning process + // was terminated or is in the process of being terminated. Either way, + // there is nothing we can reasonably do about failure, so we ignore it. + DuplicateHandle(owning_process, handle, NULL, &handle, 0, FALSE, + DUPLICATE_CLOSE_SOURCE); + return; + } + bool success = !!CloseHandle(handle); DPCHECK(success); handle = INVALID_HANDLE_VALUE; diff --git a/chromium/mojo/edk/embedder/platform_handle.h b/chromium/mojo/edk/embedder/platform_handle.h index 3c945c69dff..675dd1fe9c8 100644 --- a/chromium/mojo/edk/embedder/platform_handle.h +++ b/chromium/mojo/edk/embedder/platform_handle.h @@ -10,6 +10,8 @@ #if defined(OS_WIN) #include <windows.h> + +#include "base/process/process_handle.h" #elif defined(OS_MACOSX) && !defined(OS_IOS) #include <mach/mach.h> #endif @@ -58,13 +60,18 @@ struct MOJO_SYSTEM_IMPL_EXPORT PlatformHandle { #elif defined(OS_WIN) struct MOJO_SYSTEM_IMPL_EXPORT PlatformHandle { PlatformHandle() : handle(INVALID_HANDLE_VALUE) {} - explicit PlatformHandle(HANDLE handle) : handle(handle) {} + explicit PlatformHandle(HANDLE handle) + : handle(handle), owning_process(base::GetCurrentProcessHandle()) {} void CloseIfNecessary(); bool is_valid() const { return handle != INVALID_HANDLE_VALUE; } HANDLE handle; + + // A Windows HANDLE may be duplicated to another process but not yet sent to + // that process. This tracks the handle's owning process. + base::ProcessHandle owning_process; }; #else #error "Platform not yet supported." diff --git a/chromium/mojo/edk/embedder/platform_handle_utils_win.cc b/chromium/mojo/edk/embedder/platform_handle_utils_win.cc index eebfdeb7abd..32ed49afc18 100644 --- a/chromium/mojo/edk/embedder/platform_handle_utils_win.cc +++ b/chromium/mojo/edk/embedder/platform_handle_utils_win.cc @@ -15,6 +15,7 @@ ScopedPlatformHandle DuplicatePlatformHandle(PlatformHandle platform_handle) { DCHECK(platform_handle.is_valid()); HANDLE new_handle; + CHECK_NE(platform_handle.handle, INVALID_HANDLE_VALUE); if (!DuplicateHandle(GetCurrentProcess(), platform_handle.handle, GetCurrentProcess(), &new_handle, 0, TRUE, DUPLICATE_SAME_ACCESS)) diff --git a/chromium/mojo/edk/embedder/platform_handle_vector.h b/chromium/mojo/edk/embedder/platform_handle_vector.h index 2bea729ab24..9892b23cac0 100644 --- a/chromium/mojo/edk/embedder/platform_handle_vector.h +++ b/chromium/mojo/edk/embedder/platform_handle_vector.h @@ -5,9 +5,9 @@ #ifndef MOJO_EDK_EMBEDDER_PLATFORM_HANDLE_VECTOR_H_ #define MOJO_EDK_EMBEDDER_PLATFORM_HANDLE_VECTOR_H_ +#include <memory> #include <vector> -#include "base/memory/scoped_ptr.h" #include "mojo/edk/embedder/platform_handle.h" #include "mojo/edk/embedder/platform_handle_utils.h" #include "mojo/edk/system/system_impl_export.h" @@ -27,7 +27,7 @@ struct MOJO_SYSTEM_IMPL_EXPORT PlatformHandleVectorDeleter { }; using ScopedPlatformHandleVectorPtr = - scoped_ptr<PlatformHandleVector, PlatformHandleVectorDeleter>; + std::unique_ptr<PlatformHandleVector, PlatformHandleVectorDeleter>; } // namespace edk } // namespace mojo diff --git a/chromium/mojo/edk/embedder/platform_shared_buffer.cc b/chromium/mojo/edk/embedder/platform_shared_buffer.cc index a29a7abd761..467e5e6a8a1 100644 --- a/chromium/mojo/edk/embedder/platform_shared_buffer.cc +++ b/chromium/mojo/edk/embedder/platform_shared_buffer.cc @@ -9,6 +9,7 @@ #include <utility> #include "base/logging.h" +#include "base/memory/ptr_util.h" #include "base/memory/shared_memory.h" #include "base/process/process_handle.h" #include "base/sys_info.h" @@ -27,14 +28,7 @@ ScopedPlatformHandle SharedMemoryToPlatformHandle( #elif defined(OS_WIN) return ScopedPlatformHandle(PlatformHandle(memory_handle.GetHandle())); #else - if (memory_handle.GetType() == base::SharedMemoryHandle::MACH) { - return ScopedPlatformHandle(PlatformHandle( - memory_handle.GetMemoryObject())); - } else { - DCHECK(memory_handle.GetType() == base::SharedMemoryHandle::POSIX); - return ScopedPlatformHandle(PlatformHandle( - memory_handle.GetFileDescriptor().fd)); - } + return ScopedPlatformHandle(PlatformHandle(memory_handle.GetMemoryObject())); #endif } @@ -74,6 +68,27 @@ PlatformSharedBuffer* PlatformSharedBuffer::CreateFromPlatformHandle( } // static +PlatformSharedBuffer* PlatformSharedBuffer::CreateFromPlatformHandlePair( + size_t num_bytes, + ScopedPlatformHandle rw_platform_handle, + ScopedPlatformHandle ro_platform_handle) { + DCHECK_GT(num_bytes, 0u); + DCHECK(rw_platform_handle.is_valid()); + DCHECK(ro_platform_handle.is_valid()); + + PlatformSharedBuffer* rv = new PlatformSharedBuffer(num_bytes, false); + if (!rv->InitFromPlatformHandlePair(std::move(rw_platform_handle), + std::move(ro_platform_handle))) { + // We can't just delete it directly, due to the "in destructor" (debug) + // check. + scoped_refptr<PlatformSharedBuffer> deleter(rv); + return nullptr; + } + + return rv; +} + +// static PlatformSharedBuffer* PlatformSharedBuffer::CreateFromSharedMemoryHandle( size_t num_bytes, bool read_only, @@ -91,10 +106,10 @@ size_t PlatformSharedBuffer::GetNumBytes() const { } bool PlatformSharedBuffer::IsReadOnly() const { - return read_only_; + return read_only_; } -scoped_ptr<PlatformSharedBufferMapping> PlatformSharedBuffer::Map( +std::unique_ptr<PlatformSharedBufferMapping> PlatformSharedBuffer::Map( size_t offset, size_t length) { if (!IsValidMap(offset, length)) @@ -115,7 +130,7 @@ bool PlatformSharedBuffer::IsValidMap(size_t offset, size_t length) { return true; } -scoped_ptr<PlatformSharedBufferMapping> PlatformSharedBuffer::MapNoCheck( +std::unique_ptr<PlatformSharedBufferMapping> PlatformSharedBuffer::MapNoCheck( size_t offset, size_t length) { DCHECK(IsValidMap(offset, length)); @@ -128,10 +143,10 @@ scoped_ptr<PlatformSharedBufferMapping> PlatformSharedBuffer::MapNoCheck( if (handle == base::SharedMemory::NULLHandle()) return nullptr; - scoped_ptr<PlatformSharedBufferMapping> mapping( + std::unique_ptr<PlatformSharedBufferMapping> mapping( new PlatformSharedBufferMapping(handle, read_only_, offset, length)); if (mapping->Map()) - return make_scoped_ptr(mapping.release()); + return base::WrapUnique(mapping.release()); return nullptr; } @@ -170,6 +185,16 @@ base::SharedMemoryHandle PlatformSharedBuffer::DuplicateSharedMemoryHandle() { PlatformSharedBuffer* PlatformSharedBuffer::CreateReadOnlyDuplicate() { DCHECK(shared_memory_); + + if (ro_shared_memory_) { + base::AutoLock locker(lock_); + base::SharedMemoryHandle handle; + handle = base::SharedMemory::DuplicateHandle(ro_shared_memory_->handle()); + if (handle == base::SharedMemory::NULLHandle()) + return nullptr; + return CreateFromSharedMemoryHandle(num_bytes_, true, handle); + } + base::SharedMemoryHandle handle; bool success; { @@ -196,9 +221,6 @@ bool PlatformSharedBuffer::Init() { options.size = num_bytes_; // By default, we can share as read-only. options.share_read_only = true; -#if defined(OS_MACOSX) && !defined(OS_IOS) - options.type = base::SharedMemoryHandle::MACH; -#endif shared_memory_.reset(new base::SharedMemory); return shared_memory_->Create(options); @@ -213,12 +235,8 @@ bool PlatformSharedBuffer::InitFromPlatformHandle( base::GetCurrentProcId()); #elif defined(OS_MACOSX) && !defined(OS_IOS) base::SharedMemoryHandle handle; - if (platform_handle.get().type == PlatformHandle::Type::MACH) { - handle = base::SharedMemoryHandle( - platform_handle.release().port, num_bytes_, base::GetCurrentProcId()); - } else { - handle = base::SharedMemoryHandle(platform_handle.release().handle, false); - } + handle = base::SharedMemoryHandle(platform_handle.release().port, num_bytes_, + base::GetCurrentProcId()); #else base::SharedMemoryHandle handle(platform_handle.release().handle, false); #endif @@ -227,6 +245,26 @@ bool PlatformSharedBuffer::InitFromPlatformHandle( return true; } +bool PlatformSharedBuffer::InitFromPlatformHandlePair( + ScopedPlatformHandle rw_platform_handle, + ScopedPlatformHandle ro_platform_handle) { +#if defined(OS_WIN) || defined(OS_MACOSX) + NOTREACHED(); + return false; +#else + DCHECK(!shared_memory_); + + base::SharedMemoryHandle handle(rw_platform_handle.release().handle, false); + shared_memory_.reset(new base::SharedMemory(handle, false)); + + base::SharedMemoryHandle ro_handle(ro_platform_handle.release().handle, + false); + ro_shared_memory_.reset(new base::SharedMemory(ro_handle, true)); + + return true; +#endif +} + void PlatformSharedBuffer::InitFromSharedMemoryHandle( base::SharedMemoryHandle handle) { DCHECK(!shared_memory_); diff --git a/chromium/mojo/edk/embedder/platform_shared_buffer.h b/chromium/mojo/edk/embedder/platform_shared_buffer.h index 1cfcd3a9aca..45be7233c4f 100644 --- a/chromium/mojo/edk/embedder/platform_shared_buffer.h +++ b/chromium/mojo/edk/embedder/platform_shared_buffer.h @@ -7,9 +7,10 @@ #include <stddef.h> +#include <memory> + #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/memory/shared_memory.h" #include "base/memory/shared_memory_handle.h" #include "base/synchronization/lock.h" @@ -45,6 +46,14 @@ class MOJO_SYSTEM_IMPL_EXPORT PlatformSharedBuffer bool read_only, ScopedPlatformHandle platform_handle); + // Creates a shared buffer of size |num_bytes| from the existing pair of + // read/write and read-only handles |rw_platform_handle| and + // |ro_platform_handle|. Returns null on failure. + static PlatformSharedBuffer* CreateFromPlatformHandlePair( + size_t num_bytes, + ScopedPlatformHandle rw_platform_handle, + ScopedPlatformHandle ro_platform_handle); + // Creates a shared buffer of size |num_bytes| from the existing shared memory // handle |handle|. static PlatformSharedBuffer* CreateFromSharedMemoryHandle( @@ -61,15 +70,16 @@ class MOJO_SYSTEM_IMPL_EXPORT PlatformSharedBuffer // Maps (some) of the shared buffer into memory; [|offset|, |offset + length|] // must be contained in [0, |num_bytes|], and |length| must be at least 1. // Returns null on failure. - scoped_ptr<PlatformSharedBufferMapping> Map(size_t offset, size_t length); + std::unique_ptr<PlatformSharedBufferMapping> Map(size_t offset, + size_t length); // Checks if |offset| and |length| are valid arguments. bool IsValidMap(size_t offset, size_t length); // Like |Map()|, but doesn't check its arguments (which should have been // preflighted using |IsValidMap()|). - scoped_ptr<PlatformSharedBufferMapping> MapNoCheck(size_t offset, - size_t length); + std::unique_ptr<PlatformSharedBufferMapping> MapNoCheck(size_t offset, + size_t length); // Duplicates the underlying platform handle and passes it to the caller. ScopedPlatformHandle DuplicatePlatformHandle(); @@ -102,13 +112,20 @@ class MOJO_SYSTEM_IMPL_EXPORT PlatformSharedBuffer // claimed |num_bytes_|.) bool InitFromPlatformHandle(ScopedPlatformHandle platform_handle); + bool InitFromPlatformHandlePair(ScopedPlatformHandle rw_platform_handle, + ScopedPlatformHandle ro_platform_handle); + void InitFromSharedMemoryHandle(base::SharedMemoryHandle handle); const size_t num_bytes_; const bool read_only_; base::Lock lock_; - scoped_ptr<base::SharedMemory> shared_memory_; + std::unique_ptr<base::SharedMemory> shared_memory_; + + // A separate read-only shared memory for platforms that need it (i.e. Linux + // with sync broker). + std::unique_ptr<base::SharedMemory> ro_shared_memory_; DISALLOW_COPY_AND_ASSIGN(PlatformSharedBuffer); }; diff --git a/chromium/mojo/edk/embedder/platform_shared_buffer_unittest.cc b/chromium/mojo/edk/embedder/platform_shared_buffer_unittest.cc index 098ff26a816..f1593f06c44 100644 --- a/chromium/mojo/edk/embedder/platform_shared_buffer_unittest.cc +++ b/chromium/mojo/edk/embedder/platform_shared_buffer_unittest.cc @@ -7,9 +7,9 @@ #include <stddef.h> #include <limits> +#include <memory> #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/memory/shared_memory.h" #include "base/sys_info.h" #include "testing/gtest/include/gtest/gtest.h" @@ -36,7 +36,8 @@ TEST(PlatformSharedBufferTest, Basic) { // Map it all, scribble some stuff, and then unmap it. { EXPECT_TRUE(buffer->IsValidMap(0, kNumBytes)); - scoped_ptr<PlatformSharedBufferMapping> mapping(buffer->Map(0, kNumBytes)); + std::unique_ptr<PlatformSharedBufferMapping> mapping( + buffer->Map(0, kNumBytes)); ASSERT_TRUE(mapping); ASSERT_TRUE(mapping->GetBase()); int* stuff = static_cast<int*>(mapping->GetBase()); @@ -51,7 +52,7 @@ TEST(PlatformSharedBufferTest, Basic) { { ASSERT_TRUE(buffer->IsValidMap(0, kNumBytes)); // Use |MapNoCheck()| this time. - scoped_ptr<PlatformSharedBufferMapping> mapping1( + std::unique_ptr<PlatformSharedBufferMapping> mapping1( buffer->MapNoCheck(0, kNumBytes)); ASSERT_TRUE(mapping1); ASSERT_TRUE(mapping1->GetBase()); @@ -59,7 +60,7 @@ TEST(PlatformSharedBufferTest, Basic) { for (size_t i = 0; i < kNumInts; i++) EXPECT_EQ(static_cast<int>(i) + kFudge, stuff1[i]) << i; - scoped_ptr<PlatformSharedBufferMapping> mapping2( + std::unique_ptr<PlatformSharedBufferMapping> mapping2( buffer->Map((kNumInts / 2) * sizeof(int), 2 * sizeof(int))); ASSERT_TRUE(mapping2); ASSERT_TRUE(mapping2->GetBase()); @@ -83,7 +84,7 @@ TEST(PlatformSharedBufferTest, Basic) { // it to be. { EXPECT_TRUE(buffer->IsValidMap(sizeof(int), kNumBytes - sizeof(int))); - scoped_ptr<PlatformSharedBufferMapping> mapping( + std::unique_ptr<PlatformSharedBufferMapping> mapping( buffer->Map(sizeof(int), kNumBytes - sizeof(int))); ASSERT_TRUE(mapping); ASSERT_TRUE(mapping->GetBase()); @@ -152,8 +153,8 @@ TEST(PlatformSharedBufferTest, TooBig) { // using the address as the key for unmapping. TEST(PlatformSharedBufferTest, MappingsDistinct) { scoped_refptr<PlatformSharedBuffer> buffer(PlatformSharedBuffer::Create(100)); - scoped_ptr<PlatformSharedBufferMapping> mapping1(buffer->Map(0, 100)); - scoped_ptr<PlatformSharedBufferMapping> mapping2(buffer->Map(0, 100)); + std::unique_ptr<PlatformSharedBufferMapping> mapping1(buffer->Map(0, 100)); + std::unique_ptr<PlatformSharedBufferMapping> mapping2(buffer->Map(0, 100)); EXPECT_NE(mapping1->GetBase(), mapping2->GetBase()); } @@ -162,7 +163,8 @@ TEST(PlatformSharedBufferTest, BufferZeroInitialized) { for (size_t i = 0; i < arraysize(kSizes); i++) { scoped_refptr<PlatformSharedBuffer> buffer( PlatformSharedBuffer::Create(kSizes[i])); - scoped_ptr<PlatformSharedBufferMapping> mapping(buffer->Map(0, kSizes[i])); + std::unique_ptr<PlatformSharedBufferMapping> mapping( + buffer->Map(0, kSizes[i])); for (size_t j = 0; j < kSizes[i]; j++) { // "Assert" instead of "expect" so we don't spam the output with thousands // of failures if we fail. @@ -173,8 +175,8 @@ TEST(PlatformSharedBufferTest, BufferZeroInitialized) { } TEST(PlatformSharedBufferTest, MappingsOutliveBuffer) { - scoped_ptr<PlatformSharedBufferMapping> mapping1; - scoped_ptr<PlatformSharedBufferMapping> mapping2; + std::unique_ptr<PlatformSharedBufferMapping> mapping1; + std::unique_ptr<PlatformSharedBufferMapping> mapping2; { scoped_refptr<PlatformSharedBuffer> buffer( @@ -194,9 +196,6 @@ TEST(PlatformSharedBufferTest, FromSharedMemoryHandle) { const size_t kBufferSize = 1234; base::SharedMemoryCreateOptions options; options.size = kBufferSize; -#if defined(OS_MACOSX) && !defined(OS_IOS) - options.type = base::SharedMemoryHandle::POSIX; -#endif base::SharedMemory shared_memory; ASSERT_TRUE(shared_memory.Create(options)); ASSERT_TRUE(shared_memory.Map(kBufferSize)); @@ -208,7 +207,7 @@ TEST(PlatformSharedBufferTest, FromSharedMemoryHandle) { kBufferSize, false /* read_only */, shm_handle)); ASSERT_TRUE(simple_buffer); - scoped_ptr<PlatformSharedBufferMapping> mapping = + std::unique_ptr<PlatformSharedBufferMapping> mapping = simple_buffer->Map(0, kBufferSize); ASSERT_TRUE(mapping); diff --git a/chromium/mojo/edk/embedder/test_embedder.cc b/chromium/mojo/edk/embedder/test_embedder.cc index bb4430c8907..9658010586f 100644 --- a/chromium/mojo/edk/embedder/test_embedder.cc +++ b/chromium/mojo/edk/embedder/test_embedder.cc @@ -4,8 +4,9 @@ #include "mojo/edk/embedder/test_embedder.h" +#include <memory> + #include "base/logging.h" -#include "base/memory/scoped_ptr.h" #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/system/core.h" diff --git a/chromium/mojo/edk/js/BUILD.gn b/chromium/mojo/edk/js/BUILD.gn index 005416d778b..429a8c09355 100644 --- a/chromium/mojo/edk/js/BUILD.gn +++ b/chromium/mojo/edk/js/BUILD.gn @@ -42,7 +42,6 @@ source_set("js") { ] deps = [ - "//mojo/message_pump", "//mojo/public/cpp/system", ] } diff --git a/chromium/mojo/edk/js/core.cc b/chromium/mojo/edk/js/core.cc index f1fc9167e89..c8ccc37c536 100644 --- a/chromium/mojo/edk/js/core.cc +++ b/chromium/mojo/edk/js/core.cc @@ -145,12 +145,10 @@ MojoResult WriteMessage( raw_handles.empty() ? NULL : &raw_handles[0], static_cast<uint32_t>(raw_handles.size()), flags); - // MojoWriteMessage takes ownership of the handles upon success, so - // release them here. - if (rv == MOJO_RESULT_OK) { - for (size_t i = 0; i < handles.size(); ++i) - ignore_result(handles[i]->release()); - } + // MojoWriteMessage takes ownership of the handles, so release them here. + for (size_t i = 0; i < handles.size(); ++i) + ignore_result(handles[i]->release()); + return rv; } diff --git a/chromium/mojo/edk/js/drain_data.cc b/chromium/mojo/edk/js/drain_data.cc index 3b0a195e552..ca5fdf44097 100644 --- a/chromium/mojo/edk/js/drain_data.cc +++ b/chromium/mojo/edk/js/drain_data.cc @@ -41,7 +41,7 @@ DrainData::~DrainData() { void DrainData::WaitForData() { handle_watcher_.Start( - handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, + handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, base::Bind(&DrainData::DataReady, base::Unretained(this))); } diff --git a/chromium/mojo/edk/js/drain_data.h b/chromium/mojo/edk/js/drain_data.h index 13af286631d..917ca05f2a9 100644 --- a/chromium/mojo/edk/js/drain_data.h +++ b/chromium/mojo/edk/js/drain_data.h @@ -7,8 +7,8 @@ #include "base/memory/scoped_vector.h" #include "gin/runner.h" -#include "mojo/message_pump/handle_watcher.h" #include "mojo/public/cpp/system/core.h" +#include "mojo/public/cpp/system/watcher.h" #include "v8/include/v8.h" namespace mojo { @@ -50,7 +50,7 @@ class DrainData { v8::Isolate* isolate_; ScopedDataPipeConsumerHandle handle_; - common::HandleWatcher handle_watcher_; + Watcher handle_watcher_; base::WeakPtr<gin::Runner> runner_; v8::UniquePersistent<v8::Promise::Resolver> resolver_; ScopedVector<DataBuffer> data_buffers_; diff --git a/chromium/mojo/edk/system/BUILD.gn b/chromium/mojo/edk/system/BUILD.gn index 5d83bb6c84f..f151219fb32 100644 --- a/chromium/mojo/edk/system/BUILD.gn +++ b/chromium/mojo/edk/system/BUILD.gn @@ -23,6 +23,7 @@ component("system") { sources = [ "async_waiter.cc", "async_waiter.h", + "atomic_flag.h", "awakable.h", "awakable_list.cc", "awakable_list.h", @@ -51,6 +52,8 @@ component("system") { "handle_table.h", "mapping_table.cc", "mapping_table.h", + "message_for_transit.cc", + "message_for_transit.h", "message_pipe_dispatcher.cc", "message_pipe_dispatcher.h", "node_channel.cc", @@ -173,6 +176,7 @@ test("mojo_system_unittests") { "//base/test:test_support", "//mojo/edk/embedder:embedder_unittests", "//mojo/edk/system", + "//mojo/edk/system/ports:tests", "//mojo/edk/test:run_all_unittests", "//mojo/edk/test:test_support", "//testing/gtest", diff --git a/chromium/mojo/edk/system/atomic_flag.h b/chromium/mojo/edk/system/atomic_flag.h new file mode 100644 index 00000000000..6bdcfaaddd9 --- /dev/null +++ b/chromium/mojo/edk/system/atomic_flag.h @@ -0,0 +1,57 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_EDK_SYSTEM_ATOMIC_FLAG_H_ +#define MOJO_EDK_SYSTEM_ATOMIC_FLAG_H_ + +#include "base/atomicops.h" +#include "base/macros.h" + +namespace mojo { +namespace edk { + +// AtomicFlag is a boolean flag that can be set and tested atomically. It is +// intended to be used to fast-path checks where the common case would normally +// release the governing mutex immediately after checking. +// +// Example usage: +// void DoFoo(Bar* bar) { +// AutoLock l(lock_); +// queue_.push_back(bar); +// flag_.Set(true); +// } +// +// void Baz() { +// if (!flag_) // Assume this is the common case. +// return; +// +// AutoLock l(lock_); +// ... drain queue_ ... +// flag_.Set(false); +// } +class AtomicFlag { + public: + AtomicFlag() : flag_(0) {} + ~AtomicFlag() {} + + void Set(bool value) { + base::subtle::Release_Store(&flag_, value ? 1 : 0); + } + + bool Get() const { + return base::subtle::Acquire_Load(&flag_) ? true : false; + } + + operator const bool() const { return Get(); } + + private: + base::subtle::Atomic32 flag_; + + DISALLOW_COPY_AND_ASSIGN(AtomicFlag); +}; + +} // namespace edk +} // namespace mojo + +#endif // MOJO_EDK_SYSTEM_ATOMIC_FLAG_H_ diff --git a/chromium/mojo/edk/system/broker_host_posix.cc b/chromium/mojo/edk/system/broker_host_posix.cc index 7a5c54e551e..9c4eeae1c34 100644 --- a/chromium/mojo/edk/system/broker_host_posix.cc +++ b/chromium/mojo/edk/system/broker_host_posix.cc @@ -61,18 +61,24 @@ void BrokerHost::SendChannel(ScopedPlatformHandle handle) { void BrokerHost::OnBufferRequest(size_t num_bytes) { scoped_refptr<PlatformSharedBuffer> buffer; + scoped_refptr<PlatformSharedBuffer> read_only_buffer; if (num_bytes <= kMaxSharedBufferSize) { buffer = PlatformSharedBuffer::Create(num_bytes); + if (buffer) + read_only_buffer = buffer->CreateReadOnlyDuplicate(); + if (!read_only_buffer) + buffer = nullptr; } else { LOG(ERROR) << "Shared buffer request too large: " << num_bytes; } Channel::MessagePtr message = CreateBrokerMessage( - BrokerMessageType::BUFFER_RESPONSE, buffer ? 1 : 0, nullptr); + BrokerMessageType::BUFFER_RESPONSE, buffer ? 2 : 0, nullptr); if (buffer) { ScopedPlatformHandleVectorPtr handles; - handles.reset(new PlatformHandleVector(1)); + handles.reset(new PlatformHandleVector(2)); handles->at(0) = buffer->PassPlatformHandle().release(); + handles->at(1) = read_only_buffer->PassPlatformHandle().release(); message->SetHandles(std::move(handles)); } diff --git a/chromium/mojo/edk/system/broker_posix.cc b/chromium/mojo/edk/system/broker_posix.cc index 3cbb953f1cb..5400ed2a287 100644 --- a/chromium/mojo/edk/system/broker_posix.cc +++ b/chromium/mojo/edk/system/broker_posix.cc @@ -109,11 +109,13 @@ scoped_refptr<PlatformSharedBuffer> Broker::GetSharedBuffer(size_t num_bytes) { std::deque<PlatformHandle> incoming_platform_handles; if (WaitForBrokerMessage(sync_channel_.get(), - BrokerMessageType::BUFFER_RESPONSE, 1, + BrokerMessageType::BUFFER_RESPONSE, 2, &incoming_platform_handles)) { - return PlatformSharedBuffer::CreateFromPlatformHandle( - num_bytes, false /* read_only */, - ScopedPlatformHandle(incoming_platform_handles.front())); + ScopedPlatformHandle rw_handle(incoming_platform_handles.front()); + incoming_platform_handles.pop_front(); + ScopedPlatformHandle ro_handle(incoming_platform_handles.front()); + return PlatformSharedBuffer::CreateFromPlatformHandlePair( + num_bytes, std::move(rw_handle), std::move(ro_handle)); } return nullptr; diff --git a/chromium/mojo/edk/system/channel.cc b/chromium/mojo/edk/system/channel.cc index 107a446133e..e802527619f 100644 --- a/chromium/mojo/edk/system/channel.cc +++ b/chromium/mojo/edk/system/channel.cc @@ -12,6 +12,7 @@ #include "base/macros.h" #include "base/memory/aligned_memory.h" +#include "base/process/process_handle.h" #include "mojo/edk/embedder/platform_handle.h" #if defined(OS_MACOSX) && !defined(OS_IOS) @@ -53,7 +54,10 @@ Channel::Message::Message(size_t payload_size, // serialised into the message buffer. Since there could be a mix of fds and // mach ports, we store the mach ports as an <index, port> pair (of uint32_t), // so that the original ordering of handles can be re-created. - extra_header_size = max_handles * sizeof(MachPortsEntry); + if (max_handles) { + extra_header_size = + sizeof(MachPortsExtraHeader) + (max_handles * sizeof(MachPortsEntry)); + } #endif // Pad extra header data to be aliged to |kChannelMessageAlignment| bytes. if (extra_header_size % kChannelMessageAlignment) { @@ -96,10 +100,14 @@ Channel::Message::Message(size_t payload_size, for (size_t i = 0; i < max_handles_; ++i) handles()[i] = PlatformHandle(); #elif defined(OS_MACOSX) && !defined(OS_IOS) - mach_ports_ = reinterpret_cast<MachPortsEntry*>(mutable_extra_header()); + mach_ports_header_ = + reinterpret_cast<MachPortsExtraHeader*>(mutable_extra_header()); + mach_ports_header_->num_ports = 0; // Initialize all handles to invalid values. - for (size_t i = 0; i < max_handles_; ++i) - mach_ports_[i] = {0, static_cast<uint32_t>(MACH_PORT_NULL)}; + for (size_t i = 0; i < max_handles_; ++i) { + mach_ports_header_->entries[i] = + {0, static_cast<uint32_t>(MACH_PORT_NULL)}; + } #endif } } @@ -132,7 +140,8 @@ Channel::MessagePtr Channel::Message::Deserialize(const void* data, return nullptr; } - if (header->num_bytes < header->num_header_bytes) { + if (header->num_bytes < header->num_header_bytes || + header->num_header_bytes < sizeof(Header)) { DLOG(ERROR) << "Decoding invalid message: " << header->num_bytes << " < " << header->num_header_bytes; return nullptr; @@ -142,9 +151,15 @@ Channel::MessagePtr Channel::Message::Deserialize(const void* data, #if defined(OS_WIN) uint32_t max_handles = extra_header_size / sizeof(PlatformHandle); #elif defined(OS_MACOSX) && !defined(OS_IOS) - uint32_t max_handles = extra_header_size / sizeof(MachPortsEntry); + if (extra_header_size < sizeof(MachPortsExtraHeader)) { + DLOG(ERROR) << "Decoding invalid message: " << extra_header_size << " < " + << sizeof(MachPortsExtraHeader); + return nullptr; + } + uint32_t max_handles = (extra_header_size - sizeof(MachPortsExtraHeader)) / + sizeof(MachPortsEntry); #endif - if (header->num_handles > max_handles) { + if (header->num_handles > max_handles || max_handles > kMaxAttachedHandles) { DLOG(ERROR) << "Decoding invalid message:" << header->num_handles << " > " << max_handles; return nullptr; @@ -247,16 +262,21 @@ void Channel::Message::SetHandles(ScopedPlatformHandleVectorPtr new_handles) { #if defined(OS_MACOSX) && !defined(OS_IOS) size_t mach_port_index = 0; - for (size_t i = 0; i < max_handles_; ++i) - mach_ports_[i] = {0, static_cast<uint32_t>(MACH_PORT_NULL)}; - for (size_t i = 0; i < handle_vector_->size(); i++) { - if ((*handle_vector_)[i].type == PlatformHandle::Type::MACH || - (*handle_vector_)[i].type == PlatformHandle::Type::MACH_NAME) { - mach_port_t port = (*handle_vector_)[i].port; - mach_ports_[mach_port_index].index = i; - mach_ports_[mach_port_index].mach_port = port; - mach_port_index++; + if (mach_ports_header_) { + for (size_t i = 0; i < max_handles_; ++i) { + mach_ports_header_->entries[i] = + {0, static_cast<uint32_t>(MACH_PORT_NULL)}; + } + for (size_t i = 0; i < handle_vector_->size(); i++) { + if ((*handle_vector_)[i].type == PlatformHandle::Type::MACH || + (*handle_vector_)[i].type == PlatformHandle::Type::MACH_NAME) { + mach_port_t port = (*handle_vector_)[i].port; + mach_ports_header_->entries[mach_port_index].index = i; + mach_ports_header_->entries[mach_port_index].mach_port = port; + mach_port_index++; + } } + mach_ports_header_->num_ports = static_cast<uint16_t>(mach_port_index); } #endif } @@ -272,8 +292,13 @@ ScopedPlatformHandleVectorPtr Channel::Message::TakeHandles() { header_->num_handles = 0; return moved_handles; #elif defined(OS_MACOSX) && !defined(OS_IOS) - for (size_t i = 0; i < max_handles_; ++i) - mach_ports_[i] = {0, static_cast<uint32_t>(MACH_PORT_NULL)}; + if (mach_ports_header_) { + for (size_t i = 0; i < max_handles_; ++i) { + mach_ports_header_->entries[i] = + {0, static_cast<uint32_t>(MACH_PORT_NULL)}; + } + mach_ports_header_->num_ports = 0; + } header_->num_handles = 0; return std::move(handle_vector_); #else @@ -319,12 +344,22 @@ bool Channel::Message::RewriteHandles(base::ProcessHandle from_process, DLOG(ERROR) << "Refusing to duplicate invalid handle."; continue; } + DCHECK_EQ(handles[i].owning_process, from_process); BOOL result = DuplicateHandle( from_process, handles[i].handle, to_process, - reinterpret_cast<HANDLE*>(handles + i), 0, FALSE, + &handles[i].handle, 0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); - if (!result) + if (result) { + handles[i].owning_process = to_process; + } else { success = false; + + // If handle duplication fails, the source handle will already be closed + // due to DUPLICATE_CLOSE_SOURCE. Replace the handle in the message with + // an invalid handle. + handles[i].handle = INVALID_HANDLE_VALUE; + handles[i].owning_process = base::GetCurrentProcessHandle(); + } } return success; } @@ -344,6 +379,9 @@ bool Channel::Message::RewriteHandles(base::ProcessHandle from_process, // Discard() marks occupied bytes as discarded, signifying that their contents // can be forgotten or overwritten. // +// Realign() moves occupied bytes to the front of the buffer so that those +// occupied bytes are properly aligned. +// // The most common Channel behavior in practice should result in very few // allocations and copies, as memory is claimed and discarded shortly after // being reserved, and future reservations will immediately reuse discarded @@ -426,6 +464,13 @@ class Channel::ReadBuffer { } } + void Realign() { + size_t num_bytes = num_occupied_bytes(); + memmove(data_, occupied_bytes(), num_bytes); + num_discarded_bytes_ = 0; + num_occupied_bytes_ = num_bytes; + } + private: char* data_ = nullptr; @@ -467,6 +512,13 @@ bool Channel::OnReadComplete(size_t bytes_read, size_t *next_read_size_hint) { bool did_dispatch_message = false; read_buffer_->Claim(bytes_read); while (read_buffer_->num_occupied_bytes() >= sizeof(Message::Header)) { + // Ensure the occupied data is properly aligned. If it isn't, a SIGBUS could + // happen on architectures that don't allow misaligned words access (i.e. + // anything other than x86). Only re-align when necessary to avoid copies. + if (reinterpret_cast<uintptr_t>(read_buffer_->occupied_bytes()) % + kChannelMessageAlignment != 0) + read_buffer_->Realign(); + // We have at least enough data available for a MessageHeader. const Message::Header* header = reinterpret_cast<const Message::Header*>( read_buffer_->occupied_bytes()); diff --git a/chromium/mojo/edk/system/channel.h b/chromium/mojo/edk/system/channel.h index 74b62da8664..ab93431a443 100644 --- a/chromium/mojo/edk/system/channel.h +++ b/chromium/mojo/edk/system/channel.h @@ -25,7 +25,7 @@ class Channel : public base::RefCountedThreadSafe<Channel> { public: struct Message; - using MessagePtr = scoped_ptr<Message>; + using MessagePtr = std::unique_ptr<Message>; // A message to be written to a channel. struct Message { @@ -69,13 +69,28 @@ class Channel : public base::RefCountedThreadSafe<Channel> { #if defined(OS_MACOSX) && !defined(OS_IOS) struct MachPortsEntry { + // Index of Mach port in the original vector of PlatformHandles. uint16_t index; + + // Mach port name. uint32_t mach_port; static_assert(sizeof(mach_port_t) <= sizeof(uint32_t), "mach_port_t must be no larger than uint32_t"); }; static_assert(sizeof(MachPortsEntry) == 6, "sizeof(MachPortsEntry) must be 6 bytes"); + + // Structure of the extra header field when present on OSX. + struct MachPortsExtraHeader { + // Actual number of Mach ports encoded in the extra header. + uint16_t num_ports; + + // Array of encoded Mach ports. If |num_ports| > 0, |entires[0]| through + // to |entries[num_ports-1]| inclusive are valid. + MachPortsEntry entries[0]; + }; + static_assert(sizeof(MachPortsExtraHeader) == 2, + "sizeof(MachPortsExtraHeader) must be 2 bytes"); #endif #pragma pack(pop) @@ -155,7 +170,7 @@ class Channel : public base::RefCountedThreadSafe<Channel> { #if defined(OS_MACOSX) && !defined(OS_IOS) // On OSX, handles are serialised into the extra header section. - MachPortsEntry* mach_ports_ = nullptr; + MachPortsExtraHeader* mach_ports_header_ = nullptr; #endif DISALLOW_COPY_AND_ASSIGN(Message); @@ -258,7 +273,7 @@ class Channel : public base::RefCountedThreadSafe<Channel> { class ReadBuffer; Delegate* delegate_; - const scoped_ptr<ReadBuffer> read_buffer_; + const std::unique_ptr<ReadBuffer> read_buffer_; DISALLOW_COPY_AND_ASSIGN(Channel); }; diff --git a/chromium/mojo/edk/system/channel_posix.cc b/chromium/mojo/edk/system/channel_posix.cc index 772a608698b..72c559bf6aa 100644 --- a/chromium/mojo/edk/system/channel_posix.cc +++ b/chromium/mojo/edk/system/channel_posix.cc @@ -16,7 +16,6 @@ #include "base/location.h" #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/message_loop/message_loop.h" #include "base/synchronization/lock.h" #include "base/task_runner.h" @@ -145,14 +144,12 @@ class ChannelPosix : public Channel, // On OSX, we can have mach ports which are located in the extra header // section. using MachPortsEntry = Channel::Message::MachPortsEntry; - CHECK(extra_header_size >= num_handles * sizeof(MachPortsEntry)); - size_t num_mach_ports = 0; - const MachPortsEntry* mach_ports = - reinterpret_cast<const MachPortsEntry*>(extra_header); - for (size_t i = 0; i < num_handles; i++) { - if (mach_ports[i].mach_port != MACH_PORT_NULL) - num_mach_ports++; - } + using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader; + CHECK(extra_header_size >= + sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)); + const MachPortsExtraHeader* mach_ports_header = + reinterpret_cast<const MachPortsExtraHeader*>(extra_header); + size_t num_mach_ports = mach_ports_header->num_ports; CHECK(num_mach_ports <= num_handles); if (incoming_platform_handles_.size() + num_mach_ports < num_handles) { handles->reset(); @@ -160,6 +157,7 @@ class ChannelPosix : public Channel, } handles->reset(new PlatformHandleVector(num_handles)); + const MachPortsEntry* mach_ports = mach_ports_header->entries; for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) { if (mach_port_index < num_mach_ports && mach_ports[mach_port_index].index == i) { @@ -476,8 +474,8 @@ class ChannelPosix : public Channel, scoped_refptr<base::TaskRunner> io_task_runner_; // These watchers must only be accessed on the IO thread. - scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; - scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; + std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; + std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; std::deque<PlatformHandle> incoming_platform_handles_; diff --git a/chromium/mojo/edk/system/channel_win.cc b/chromium/mojo/edk/system/channel_win.cc index 465ba62dbba..38744ab99b1 100644 --- a/chromium/mojo/edk/system/channel_win.cc +++ b/chromium/mojo/edk/system/channel_win.cc @@ -16,7 +16,6 @@ #include "base/location.h" #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/message_loop/message_loop.h" #include "base/synchronization/lock.h" #include "base/task_runner.h" @@ -80,13 +79,7 @@ class ChannelWin : public Channel, self_(this), handle_(std::move(handle)), io_task_runner_(io_task_runner) { - sentinel_ = ~reinterpret_cast<uintptr_t>(this); CHECK(handle_.is_valid()); - memset(&read_context_, 0, sizeof(read_context_)); - read_context_.handler = this; - - memset(&write_context_, 0, sizeof(write_context_)); - write_context_.handler = this; } void Start() override { @@ -139,13 +132,7 @@ class ChannelWin : public Channel, private: // May run on any thread. - ~ChannelWin() override { - // This is intentionally not 0. If another object is constructed on top of - // this memory, it is likely to initialise values to 0. Using a non-zero - // value lets us detect the difference between just destroying, and - // re-allocating the memory. - sentinel_ = UINTPTR_MAX; - } + ~ChannelWin() override {} void StartOnIOThread() { base::MessageLoop::current()->AddDestructionObserver(this); @@ -181,7 +168,6 @@ class ChannelWin : public Channel, // base::MessageLoop::DestructionObserver: void WillDestroyCurrentMessageLoop() override { - CheckValid(); DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); if (self_) ShutDownOnIOThread(); @@ -191,7 +177,6 @@ class ChannelWin : public Channel, void OnIOCompleted(base::MessageLoopForIO::IOContext* context, DWORD bytes_transfered, DWORD error) override { - CheckValid(); if (error != ERROR_SUCCESS) { OnError(); } else if (context == &read_context_) { @@ -286,10 +271,6 @@ class ChannelWin : public Channel, return WriteNoLock(outgoing_messages_.front()); } - void CheckValid() const { - CHECK_EQ(reinterpret_cast<uintptr_t>(this), ~sentinel_); - } - // Keeps the Channel alive at least until explicit shutdown on the IO thread. scoped_refptr<Channel> self_; @@ -307,12 +288,6 @@ class ChannelWin : public Channel, bool reject_writes_ = false; std::deque<MessageView> outgoing_messages_; - // A value that is unlikely to be valid if this object is destroyed and the - // memory overwritten by something else. When this is valid, its value will be - // ~|this|. - // TODO(amistry): Remove before M50 branch point. - uintptr_t sentinel_; - DISALLOW_COPY_AND_ASSIGN(ChannelWin); }; diff --git a/chromium/mojo/edk/system/core.cc b/chromium/mojo/edk/system/core.cc index b3602d7c5f4..65a64e8fda8 100644 --- a/chromium/mojo/edk/system/core.cc +++ b/chromium/mojo/edk/system/core.cc @@ -13,9 +13,10 @@ #include "base/location.h" #include "base/logging.h" #include "base/macros.h" +#include "base/memory/ptr_util.h" #include "base/message_loop/message_loop.h" #include "base/rand_util.h" -#include "base/thread_task_runner_handle.h" +#include "base/threading/thread_task_runner_handle.h" #include "base/time/time.h" #include "crypto/random.h" #include "mojo/edk/embedder/embedder.h" @@ -27,6 +28,7 @@ #include "mojo/edk/system/data_pipe_consumer_dispatcher.h" #include "mojo/edk/system/data_pipe_producer_dispatcher.h" #include "mojo/edk/system/handle_signals_state.h" +#include "mojo/edk/system/message_for_transit.h" #include "mojo/edk/system/message_pipe_dispatcher.h" #include "mojo/edk/system/platform_handle_dispatcher.h" #include "mojo/edk/system/ports/node.h" @@ -246,6 +248,7 @@ ScopedMessagePipeHandle Core::CreateMessagePipe( ScopedMessagePipeHandle Core::CreateParentMessagePipe( const std::string& token) { + RequestContext request_context; ports::PortRef port0, port1; GetNodeController()->node()->CreatePortPair(&port0, &port1); MojoHandle handle = AddDispatcher( @@ -256,6 +259,7 @@ ScopedMessagePipeHandle Core::CreateParentMessagePipe( } ScopedMessagePipeHandle Core::CreateChildMessagePipe(const std::string& token) { + RequestContext request_context; ports::PortRef port0, port1; GetNodeController()->node()->CreatePortPair(&port0, &port1); MojoHandle handle = AddDispatcher( @@ -271,7 +275,8 @@ MojoResult Core::AsyncWait(MojoHandle handle, scoped_refptr<Dispatcher> dispatcher = GetDispatcher(handle); DCHECK(dispatcher); - scoped_ptr<AsyncWaiter> waiter = make_scoped_ptr(new AsyncWaiter(callback)); + std::unique_ptr<AsyncWaiter> waiter = + base::WrapUnique(new AsyncWaiter(callback)); MojoResult rv = dispatcher->AddAwakable(waiter.get(), signals, 0, nullptr); if (rv == MOJO_RESULT_OK) ignore_result(waiter.release()); @@ -357,6 +362,76 @@ MojoResult Core::CancelWatch(MojoHandle handle, uintptr_t context) { return dispatcher->CancelWatch(context); } +MojoResult Core::AllocMessage(uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoAllocMessageFlags flags, + MojoMessageHandle* message) { + if (!message) + return MOJO_RESULT_INVALID_ARGUMENT; + + if (num_handles == 0) { // Fast path: no handles. + std::unique_ptr<MessageForTransit> msg; + MojoResult rv = MessageForTransit::Create(&msg, num_bytes, nullptr, 0); + if (rv != MOJO_RESULT_OK) + return rv; + + *message = reinterpret_cast<MojoMessageHandle>(msg.release()); + return MOJO_RESULT_OK; + } + + if (!handles) + return MOJO_RESULT_INVALID_ARGUMENT; + + if (num_handles > kMaxHandlesPerMessage) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + std::vector<Dispatcher::DispatcherInTransit> dispatchers; + { + base::AutoLock lock(handles_lock_); + MojoResult rv = handles_.BeginTransit(handles, num_handles, &dispatchers); + if (rv != MOJO_RESULT_OK) { + handles_.CancelTransit(dispatchers); + return rv; + } + } + DCHECK_EQ(num_handles, dispatchers.size()); + + std::unique_ptr<MessageForTransit> msg; + MojoResult rv = MessageForTransit::Create( + &msg, num_bytes, dispatchers.data(), num_handles); + + { + base::AutoLock lock(handles_lock_); + if (rv == MOJO_RESULT_OK) { + handles_.CompleteTransitAndClose(dispatchers); + *message = reinterpret_cast<MojoMessageHandle>(msg.release()); + } else { + handles_.CancelTransit(dispatchers); + } + } + + return rv; +} + +MojoResult Core::FreeMessage(MojoMessageHandle message) { + if (!message) + return MOJO_RESULT_INVALID_ARGUMENT; + + delete reinterpret_cast<MessageForTransit*>(message); + + return MOJO_RESULT_OK; +} + +MojoResult Core::GetMessageBuffer(MojoMessageHandle message, void** buffer) { + if (!message) + return MOJO_RESULT_INVALID_ARGUMENT; + + *buffer = reinterpret_cast<MessageForTransit*>(message)->mutable_bytes(); + + return MOJO_RESULT_OK; +} + MojoResult Core::CreateWaitSet(MojoHandle* wait_set_handle) { RequestContext request_context; if (!wait_set_handle) @@ -472,48 +547,36 @@ MojoResult Core::WriteMessage(MojoHandle message_pipe_handle, const MojoHandle* handles, uint32_t num_handles, MojoWriteMessageFlags flags) { - RequestContext request_context; - auto dispatcher = GetDispatcher(message_pipe_handle); - if (!dispatcher) + if (num_bytes && !bytes) return MOJO_RESULT_INVALID_ARGUMENT; - if (num_handles == 0) // Fast path: no handles. - return dispatcher->WriteMessage(bytes, num_bytes, nullptr, 0, flags); - - CHECK(handles); - - if (num_handles > kMaxHandlesPerMessage) - return MOJO_RESULT_RESOURCE_EXHAUSTED; - - for (size_t i = 0; i < num_handles; ++i) { - if (message_pipe_handle == handles[i]) - return MOJO_RESULT_BUSY; + MojoMessageHandle message; + MojoResult rv = AllocMessage(num_bytes, handles, num_handles, + MOJO_ALLOC_MESSAGE_FLAG_NONE, &message); + if (rv != MOJO_RESULT_OK) + return rv; + + if (num_bytes) { + void* buffer = nullptr; + rv = GetMessageBuffer(message, &buffer); + DCHECK_EQ(rv, MOJO_RESULT_OK); + memcpy(buffer, bytes, num_bytes); } - std::vector<Dispatcher::DispatcherInTransit> dispatchers; - { - base::AutoLock lock(handles_lock_); - MojoResult rv = handles_.BeginTransit(handles, num_handles, &dispatchers); - if (rv != MOJO_RESULT_OK) { - handles_.CancelTransit(dispatchers); - return rv; - } - } - DCHECK_EQ(num_handles, dispatchers.size()); - - MojoResult rv = dispatcher->WriteMessage( - bytes, num_bytes, dispatchers.data(), num_handles, flags); + return WriteMessageNew(message_pipe_handle, message, flags); +} - { - base::AutoLock lock(handles_lock_); - if (rv == MOJO_RESULT_OK) { - handles_.CompleteTransitAndClose(dispatchers); - } else { - handles_.CancelTransit(dispatchers); - } - } +MojoResult Core::WriteMessageNew(MojoHandle message_pipe_handle, + MojoMessageHandle message, + MojoWriteMessageFlags flags) { + RequestContext request_context; + std::unique_ptr<MessageForTransit> message_for_transit( + reinterpret_cast<MessageForTransit*>(message)); + auto dispatcher = GetDispatcher(message_pipe_handle); + if (!dispatcher) + return MOJO_RESULT_INVALID_ARGUMENT; - return rv; + return dispatcher->WriteMessage(std::move(message_for_transit), flags); } MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, @@ -522,13 +585,45 @@ MojoResult Core::ReadMessage(MojoHandle message_pipe_handle, MojoHandle* handles, uint32_t* num_handles, MojoReadMessageFlags flags) { - RequestContext request_context; CHECK((!num_handles || !*num_handles || handles) && (!num_bytes || !*num_bytes || bytes)); + RequestContext request_context; + auto dispatcher = GetDispatcher(message_pipe_handle); + if (!dispatcher) + return MOJO_RESULT_INVALID_ARGUMENT; + std::unique_ptr<MessageForTransit> message; + MojoResult rv = + dispatcher->ReadMessage(&message, num_bytes, handles, num_handles, flags, + false /* ignore_num_bytes */); + if (rv != MOJO_RESULT_OK) + return rv; + + if (message && message->num_bytes()) + memcpy(bytes, message->bytes(), message->num_bytes()); + + return MOJO_RESULT_OK; +} + +MojoResult Core::ReadMessageNew(MojoHandle message_pipe_handle, + MojoMessageHandle* message, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags) { + CHECK(message); + CHECK(!num_handles || !*num_handles || handles); + RequestContext request_context; auto dispatcher = GetDispatcher(message_pipe_handle); if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; - return dispatcher->ReadMessage(bytes, num_bytes, handles, num_handles, flags); + std::unique_ptr<MessageForTransit> msg; + MojoResult rv = + dispatcher->ReadMessage(&msg, num_bytes, handles, num_handles, flags, + true /* ignore_num_bytes */); + if (rv != MOJO_RESULT_OK) + return rv; + *message = reinterpret_cast<MojoMessageHandle>(msg.release()); + return MOJO_RESULT_OK; } MojoResult Core::FuseMessagePipes(MojoHandle handle0, MojoHandle handle1) { @@ -761,7 +856,7 @@ MojoResult Core::MapBuffer(MojoHandle buffer_handle, if (!dispatcher) return MOJO_RESULT_INVALID_ARGUMENT; - scoped_ptr<PlatformSharedBufferMapping> mapping; + std::unique_ptr<PlatformSharedBufferMapping> mapping; MojoResult result = dispatcher->MapBuffer(offset, num_bytes, flags, &mapping); if (result != MOJO_RESULT_OK) return result; @@ -860,7 +955,7 @@ MojoResult Core::WaitManyInternal(const MojoHandle* handles, // static void Core::PassNodeControllerToIOThread( - scoped_ptr<NodeController> node_controller) { + std::unique_ptr<NodeController> node_controller) { // It's OK to leak this reference. At this point we know the IO loop is still // running, and we know the NodeController will observe its eventual // destruction. This tells the NodeController to delete itself when that diff --git a/chromium/mojo/edk/system/core.h b/chromium/mojo/edk/system/core.h index 18da0ba375c..7871480e72c 100644 --- a/chromium/mojo/edk/system/core.h +++ b/chromium/mojo/edk/system/core.h @@ -5,12 +5,12 @@ #ifndef MOJO_EDK_SYSTEM_CORE_H_ #define MOJO_EDK_SYSTEM_CORE_H_ +#include <memory> #include <vector> #include "base/callback.h" #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/memory/shared_memory_handle.h" #include "base/synchronization/lock.h" #include "base/task_runner.h" @@ -144,6 +144,13 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { MojoWatchCallback callback, uintptr_t context); MojoResult CancelWatch(MojoHandle handle, uintptr_t context); + MojoResult AllocMessage(uint32_t num_bytes, + const MojoHandle* handles, + uint32_t num_handles, + MojoAllocMessageFlags flags, + MojoMessageHandle* message); + MojoResult FreeMessage(MojoMessageHandle message); + MojoResult GetMessageBuffer(MojoMessageHandle message, void** buffer); // These methods correspond to the API functions defined in // "mojo/public/c/system/wait_set.h": @@ -171,12 +178,21 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { const MojoHandle* handles, uint32_t num_handles, MojoWriteMessageFlags flags); + MojoResult WriteMessageNew(MojoHandle message_pipe_handle, + MojoMessageHandle message, + MojoWriteMessageFlags flags); MojoResult ReadMessage(MojoHandle message_pipe_handle, void* bytes, uint32_t* num_bytes, MojoHandle* handles, uint32_t* num_handles, MojoReadMessageFlags flags); + MojoResult ReadMessageNew(MojoHandle message_pipe_handle, + MojoMessageHandle* message, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags); MojoResult FuseMessagePipes(MojoHandle handle0, MojoHandle handle1); // These methods correspond to the API functions defined in @@ -236,7 +252,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { // Used to pass ownership of our NodeController over to the IO thread in the // event that we're torn down before said thread. static void PassNodeControllerToIOThread( - scoped_ptr<NodeController> node_controller); + std::unique_ptr<NodeController> node_controller); // Guards node_controller_. // @@ -250,7 +266,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Core { // This is lazily initialized on first access. Always use GetNodeController() // to access it. - scoped_ptr<NodeController> node_controller_; + std::unique_ptr<NodeController> node_controller_; base::Lock handles_lock_; HandleTable handles_; diff --git a/chromium/mojo/edk/system/core_test_base.cc b/chromium/mojo/edk/system/core_test_base.cc index 635501aa2c7..e98a55d2ad1 100644 --- a/chromium/mojo/edk/system/core_test_base.cc +++ b/chromium/mojo/edk/system/core_test_base.cc @@ -16,6 +16,7 @@ #include "mojo/edk/system/configuration.h" #include "mojo/edk/system/core.h" #include "mojo/edk/system/dispatcher.h" +#include "mojo/edk/system/message_for_transit.h" namespace mojo { namespace edk { @@ -41,27 +42,25 @@ class MockDispatcher : public Dispatcher { } MojoResult WriteMessage( - const void* bytes, - uint32_t num_bytes, - const DispatcherInTransit* dispatchers, - uint32_t num_dispatchers, + std::unique_ptr<MessageForTransit> message, MojoWriteMessageFlags /*flags*/) override { info_->IncrementWriteMessageCallCount(); - if (num_bytes > GetConfiguration().max_message_num_bytes) + if (message->num_bytes() > GetConfiguration().max_message_num_bytes) return MOJO_RESULT_RESOURCE_EXHAUSTED; - if (dispatchers) + if (message->num_handles()) return MOJO_RESULT_UNIMPLEMENTED; return MOJO_RESULT_OK; } - MojoResult ReadMessage(void* bytes, + MojoResult ReadMessage(std::unique_ptr<MessageForTransit>* message, uint32_t* num_bytes, MojoHandle* handle, uint32_t* num_handles, - MojoReadMessageFlags /*flags*/) override { + MojoReadMessageFlags /*flags*/, + bool ignore_num_bytes) override { info_->IncrementReadMessageCallCount(); if (num_handles) diff --git a/chromium/mojo/edk/system/core_unittest.cc b/chromium/mojo/edk/system/core_unittest.cc index 7eeabc03e2f..a25c7af3d08 100644 --- a/chromium/mojo/edk/system/core_unittest.cc +++ b/chromium/mojo/edk/system/core_unittest.cc @@ -327,6 +327,18 @@ TEST_F(CoreTest, InvalidArguments) { MOJO_WRITE_MESSAGE_FLAG_NONE)); ASSERT_EQ(0u, info.GetWriteMessageCallCount()); + // Null |bytes| with non-zero message size. + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WriteMessage(h, nullptr, 1, nullptr, 0, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + ASSERT_EQ(0u, info.GetWriteMessageCallCount()); + + // Null |handles| with non-zero handle count. + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + core()->WriteMessage(h, nullptr, 0, nullptr, 1, + MOJO_WRITE_MESSAGE_FLAG_NONE)); + ASSERT_EQ(0u, info.GetWriteMessageCallCount()); + // Huge handle count (plausibly big). ASSERT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, core()->WriteMessage( @@ -349,19 +361,21 @@ TEST_F(CoreTest, InvalidArguments) { MOJO_WRITE_MESSAGE_FLAG_NONE)); ASSERT_EQ(0u, info.GetWriteMessageCallCount()); - // Can't send a handle over itself. + // Can't send a handle over itself. Note that this will also cause |h| to be + // closed. handles[0] = h; ASSERT_EQ( - MOJO_RESULT_BUSY, + MOJO_RESULT_INVALID_ARGUMENT, core()->WriteMessage(h, nullptr, 0, handles, 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); ASSERT_EQ(0u, info.GetWriteMessageCallCount()); + h = CreateMockHandle(&info); + MockHandleInfo info2; - MojoHandle h2 = CreateMockHandle(&info2); // This is "okay", but |MockDispatcher| doesn't implement it. - handles[0] = h2; + handles[0] = CreateMockHandle(&info2); ASSERT_EQ( MOJO_RESULT_UNIMPLEMENTED, core()->WriteMessage(h, nullptr, 0, handles, 1, @@ -369,32 +383,33 @@ TEST_F(CoreTest, InvalidArguments) { ASSERT_EQ(1u, info.GetWriteMessageCallCount()); // One of the |handles| is still invalid. + handles[0] = CreateMockHandle(&info2); ASSERT_EQ( MOJO_RESULT_INVALID_ARGUMENT, core()->WriteMessage(h, nullptr, 0, handles, 2, MOJO_WRITE_MESSAGE_FLAG_NONE)); ASSERT_EQ(1u, info.GetWriteMessageCallCount()); - // One of the |handles| is the same as |handle|. + // One of the |handles| is the same as |h|. Both handles are closed. + handles[0] = CreateMockHandle(&info2); handles[1] = h; ASSERT_EQ( - MOJO_RESULT_BUSY, + MOJO_RESULT_INVALID_ARGUMENT, core()->WriteMessage(h, nullptr, 0, handles, 2, MOJO_WRITE_MESSAGE_FLAG_NONE)); ASSERT_EQ(1u, info.GetWriteMessageCallCount()); + h = CreateMockHandle(&info); + // Can't send a handle twice in the same message. - handles[1] = h2; + handles[0] = CreateMockHandle(&info2); + handles[1] = handles[0]; ASSERT_EQ( MOJO_RESULT_BUSY, core()->WriteMessage(h, nullptr, 0, handles, 2, MOJO_WRITE_MESSAGE_FLAG_NONE)); ASSERT_EQ(1u, info.GetWriteMessageCallCount()); - // Note: Since we never successfully sent anything with it, |h2| should - // still be valid. - ASSERT_EQ(MOJO_RESULT_OK, core()->Close(h2)); - ASSERT_EQ(MOJO_RESULT_OK, core()->Close(h)); } @@ -430,7 +445,11 @@ TEST_F(CoreTest, InvalidArguments) { // (for required pointer arguments) will still cause death, but perhaps not // predictably. TEST_F(CoreTest, InvalidArgumentsDeath) { +#if defined(OFFICIAL_BUILD) + const char kMemoryCheckFailedRegex[] = ""; +#else const char kMemoryCheckFailedRegex[] = "Check failed"; +#endif // |WaitMany()|: { @@ -462,22 +481,6 @@ TEST_F(CoreTest, InvalidArgumentsDeath) { kMemoryCheckFailedRegex); } - // |WriteMessage()|: - // Only check arguments checked by |Core|, namely |handle|, |handles|, and - // |num_handles|. - { - MockHandleInfo info; - MojoHandle h = CreateMockHandle(&info); - - // Null |handles| with nonzero |num_handles|. - ASSERT_DEATH_IF_SUPPORTED( - core()->WriteMessage(h, nullptr, 0, nullptr, 1, - MOJO_WRITE_MESSAGE_FLAG_NONE), - kMemoryCheckFailedRegex); - - ASSERT_EQ(MOJO_RESULT_OK, core()->Close(h)); - } - // |ReadMessage()|: // Only check arguments checked by |Core|, namely |handle|, |handles|, and // |num_handles|. @@ -711,21 +714,23 @@ TEST_F(CoreTest, MessagePipeBasicLocalHandlePassing1) { // Make sure that you can't pass either of the message pipe's handles over // itself. - ASSERT_EQ(MOJO_RESULT_BUSY, + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->WriteMessage(h_passing[0], kHello, kHelloSize, &h_passing[0], 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); + ASSERT_EQ(MOJO_RESULT_OK, + core()->CreateMessagePipe(nullptr, &h_passing[0], &h_passing[1])); + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, core()->WriteMessage(h_passing[0], kHello, kHelloSize, &h_passing[1], 1, MOJO_WRITE_MESSAGE_FLAG_NONE)); + ASSERT_EQ(MOJO_RESULT_OK, + core()->CreateMessagePipe(nullptr, &h_passing[0], &h_passing[1])); MojoHandle h_passed[2]; - MojoCreateMessagePipeOptions options; - options.struct_size = sizeof(MojoCreateMessagePipeOptions); - options.flags = MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_TRANSFERABLE; ASSERT_EQ(MOJO_RESULT_OK, - core()->CreateMessagePipe(&options, &h_passed[0], &h_passed[1])); + core()->CreateMessagePipe(nullptr, &h_passed[0], &h_passed[1])); // Make sure that |h_passed[]| work properly. ASSERT_EQ(MOJO_RESULT_OK, diff --git a/chromium/mojo/edk/system/data_pipe.cc b/chromium/mojo/edk/system/data_pipe.cc deleted file mode 100644 index 09c3be87198..00000000000 --- a/chromium/mojo/edk/system/data_pipe.cc +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2015 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "mojo/edk/system/data_pipe.h" - -#include <stddef.h> -#include <stdint.h> -#include <string.h> - -#include <algorithm> -#include <limits> - -#include "mojo/edk/system/configuration.h" -#include "mojo/edk/system/options_validation.h" -#include "mojo/edk/system/raw_channel.h" -#include "mojo/edk/system/transport_data.h" - -namespace mojo { -namespace edk { - -namespace { - -const uint32_t kInvalidDataPipeHandleIndex = static_cast<uint32_t>(-1); - -struct MOJO_ALIGNAS(8) SerializedDataPipeHandleDispatcher { - MOJO_ALIGNAS(4) - uint32_t platform_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) - - // These are from MojoCreateDataPipeOptions - MOJO_ALIGNAS(4) MojoCreateDataPipeOptionsFlags flags; - MOJO_ALIGNAS(4) uint32_t element_num_bytes; - MOJO_ALIGNAS(4) uint32_t capacity_num_bytes; - - MOJO_ALIGNAS(4) - uint32_t shared_memory_handle_index; // (Or |kInvalidDataPipeHandleIndex|.) - MOJO_ALIGNAS(4) uint32_t shared_memory_size; -}; - -} // namespace - -MojoCreateDataPipeOptions DataPipe::GetDefaultCreateOptions() { - MojoCreateDataPipeOptions result = { - static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), - MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, - 1u, - static_cast<uint32_t>( - GetConfiguration().default_data_pipe_capacity_bytes)}; - return result; -} - -MojoResult DataPipe::ValidateCreateOptions( - const MojoCreateDataPipeOptions* in_options, - MojoCreateDataPipeOptions* out_options) { - const MojoCreateDataPipeOptionsFlags kKnownFlags = - MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE; - - *out_options = GetDefaultCreateOptions(); - if (!in_options) - return MOJO_RESULT_OK; - - UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options); - if (!reader.is_valid()) - return MOJO_RESULT_INVALID_ARGUMENT; - - if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader)) - return MOJO_RESULT_OK; - if ((reader.options().flags & ~kKnownFlags)) - return MOJO_RESULT_UNIMPLEMENTED; - out_options->flags = reader.options().flags; - - // Checks for fields beyond |flags|: - - if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, element_num_bytes, - reader)) - return MOJO_RESULT_OK; - if (reader.options().element_num_bytes == 0) - return MOJO_RESULT_INVALID_ARGUMENT; - out_options->element_num_bytes = reader.options().element_num_bytes; - - if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, capacity_num_bytes, - reader) || - reader.options().capacity_num_bytes == 0) { - // Round the default capacity down to a multiple of the element size (but at - // least one element). - size_t default_data_pipe_capacity_bytes = - GetConfiguration().default_data_pipe_capacity_bytes; - out_options->capacity_num_bytes = - std::max(static_cast<uint32_t>(default_data_pipe_capacity_bytes - - (default_data_pipe_capacity_bytes % - out_options->element_num_bytes)), - out_options->element_num_bytes); - return MOJO_RESULT_OK; - } - if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) - return MOJO_RESULT_INVALID_ARGUMENT; - if (reader.options().capacity_num_bytes > - GetConfiguration().max_data_pipe_capacity_bytes) - return MOJO_RESULT_RESOURCE_EXHAUSTED; - out_options->capacity_num_bytes = reader.options().capacity_num_bytes; - - return MOJO_RESULT_OK; -} - -void DataPipe::StartSerialize(bool have_channel_handle, - bool have_shared_memory, - size_t* max_size, - size_t* max_platform_handles) { - *max_size = sizeof(SerializedDataPipeHandleDispatcher); - *max_platform_handles = 0; - if (have_channel_handle) - (*max_platform_handles)++; - if (have_shared_memory) - (*max_platform_handles)++; -} - -void DataPipe::EndSerialize(const MojoCreateDataPipeOptions& options, - ScopedPlatformHandle channel_handle, - ScopedPlatformHandle shared_memory_handle, - size_t shared_memory_size, - void* destination, - size_t* actual_size, - PlatformHandleVector* platform_handles) { - SerializedDataPipeHandleDispatcher* serialization = - static_cast<SerializedDataPipeHandleDispatcher*>(destination); - if (channel_handle.is_valid()) { - DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); - serialization->platform_handle_index = - static_cast<uint32_t>(platform_handles->size()); - platform_handles->push_back(channel_handle.release()); - } else { - serialization->platform_handle_index = kInvalidDataPipeHandleIndex; - } - - serialization->flags = options.flags; - serialization->element_num_bytes = options.element_num_bytes; - serialization->capacity_num_bytes = options.capacity_num_bytes; - - serialization->shared_memory_size = static_cast<uint32_t>(shared_memory_size); - if (serialization->shared_memory_size) { - DCHECK(platform_handles->size() < std::numeric_limits<uint32_t>::max()); - serialization->shared_memory_handle_index = - static_cast<uint32_t>(platform_handles->size()); - platform_handles->push_back(shared_memory_handle.release()); - } else { - serialization->shared_memory_handle_index = kInvalidDataPipeHandleIndex; - } - - *actual_size = sizeof(SerializedDataPipeHandleDispatcher); -} - -ScopedPlatformHandle DataPipe::Deserialize( - const void* source, - size_t size, - PlatformHandleVector* platform_handles, - MojoCreateDataPipeOptions* options, - ScopedPlatformHandle* shared_memory_handle, - size_t* shared_memory_size) { - if (size != sizeof(SerializedDataPipeHandleDispatcher)) { - LOG(ERROR) << "Invalid serialized data pipe dispatcher (bad size)"; - return ScopedPlatformHandle(); - } - - const SerializedDataPipeHandleDispatcher* serialization = - static_cast<const SerializedDataPipeHandleDispatcher*>(source); - size_t platform_handle_index = serialization->platform_handle_index; - - // Starts off invalid, which is what we want. - PlatformHandle platform_handle; - if (platform_handle_index != kInvalidDataPipeHandleIndex) { - if (!platform_handles || - platform_handle_index >= platform_handles->size()) { - LOG(ERROR) - << "Invalid serialized data pipe dispatcher (missing handles)"; - return ScopedPlatformHandle(); - } - - // We take ownership of the handle, so we have to invalidate the one in - // |platform_handles|. - std::swap(platform_handle, (*platform_handles)[platform_handle_index]); - } - - options->struct_size = sizeof(MojoCreateDataPipeOptions); - options->flags = serialization->flags; - options->element_num_bytes = serialization->element_num_bytes; - options->capacity_num_bytes = serialization->capacity_num_bytes; - - if (shared_memory_size) { - *shared_memory_size = serialization->shared_memory_size; - if (*shared_memory_size) { - DCHECK(serialization->shared_memory_handle_index != - kInvalidDataPipeHandleIndex); - if (!platform_handles || - serialization->shared_memory_handle_index >= - platform_handles->size()) { - LOG(ERROR) << "Invalid serialized data pipe dispatcher " - << "(missing handles)"; - return ScopedPlatformHandle(); - } - - PlatformHandle temp_shared_memory_handle; - std::swap(temp_shared_memory_handle, - (*platform_handles)[serialization->shared_memory_handle_index]); - *shared_memory_handle = - ScopedPlatformHandle(temp_shared_memory_handle); - } - } - - size -= sizeof(SerializedDataPipeHandleDispatcher); - - return ScopedPlatformHandle(platform_handle); -} - -} // namespace edk -} // namespace mojo diff --git a/chromium/mojo/edk/system/data_pipe.h b/chromium/mojo/edk/system/data_pipe.h deleted file mode 100644 index 24281898a95..00000000000 --- a/chromium/mojo/edk/system/data_pipe.h +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2015 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef MOJO_EDK_SYSTEM_DATA_PIPE_H_ -#define MOJO_EDK_SYSTEM_DATA_PIPE_H_ - -#include <stddef.h> - -#include "base/compiler_specific.h" -#include "mojo/edk/embedder/platform_handle_vector.h" -#include "mojo/edk/embedder/scoped_platform_handle.h" -#include "mojo/edk/system/system_impl_export.h" -#include "mojo/public/c/system/data_pipe.h" -#include "mojo/public/c/system/types.h" - -namespace mojo { -namespace edk { -class RawChannel; - -// Shared code between DataPipeConsumerDispatcher and -// DataPipeProducerDispatcher. -class MOJO_SYSTEM_IMPL_EXPORT DataPipe { - public: - // The default options for |MojoCreateDataPipe()|. (Real uses should obtain - // this via |ValidateCreateOptions()| with a null |in_options|; this is - // exposed directly for testing convenience.) - static MojoCreateDataPipeOptions GetDefaultCreateOptions(); - - // Validates and/or sets default options for |MojoCreateDataPipeOptions|. If - // non-null, |in_options| must point to a struct of at least - // |in_options->struct_size| bytes. |out_options| must point to a (current) - // |MojoCreateDataPipeOptions| and will be entirely overwritten on success (it - // may be partly overwritten on failure). - static MojoResult ValidateCreateOptions( - const MojoCreateDataPipeOptions* in_options, - MojoCreateDataPipeOptions* out_options); - - // Helper methods used by DataPipeConsumerDispatcher and - // DataPipeProducerDispatcher for serialization and deserialization. - static void StartSerialize(bool have_channel_handle, - bool have_shared_memory, - size_t* max_size, - size_t* max_platform_handles); - static void EndSerialize(const MojoCreateDataPipeOptions& options, - ScopedPlatformHandle channel_handle, - ScopedPlatformHandle shared_memory_handle, - size_t shared_memory_size, - void* destination, - size_t* actual_size, - PlatformHandleVector* platform_handles); - static ScopedPlatformHandle Deserialize( - const void* source, - size_t size, - PlatformHandleVector* platform_handles, - MojoCreateDataPipeOptions* options, - ScopedPlatformHandle* shared_memory_handle, - size_t* shared_memory_size); -}; - -} // namespace edk -} // namespace mojo - -#endif // MOJO_EDK_SYSTEM_DATA_PIPE_H_ diff --git a/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.cc index 158b0e17e3e..23cb2e0368d 100644 --- a/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.cc +++ b/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.cc @@ -518,16 +518,12 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { size_t previous_bytes_available = bytes_available_; ports::PortStatus port_status; - if (node_controller_->node()->GetStatus(control_port_, &port_status) != - ports::OK || - !port_status.receiving_messages) { + int rv = node_controller_->node()->GetStatus(control_port_, &port_status); + if (rv != ports::OK || !port_status.receiving_messages) { DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" << " [control_port=" << control_port_.name() << "]"; - peer_closed_ = true; - } - - if (port_status.has_messages && !in_transit_) { + } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { ports::ScopedMessage message; do { int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, diff --git a/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.h b/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.h index 945aa073eea..6a7fb1c5b03 100644 --- a/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.h +++ b/chromium/mojo/edk/system/data_pipe_consumer_dispatcher.h @@ -8,9 +8,10 @@ #include <stddef.h> #include <stdint.h> +#include <memory> + #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/synchronization/lock.h" #include "mojo/edk/embedder/platform_handle_vector.h" #include "mojo/edk/embedder/platform_shared_buffer.h" @@ -103,7 +104,7 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final AwakableList awakable_list_; scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_; - scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_; + std::unique_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_; ScopedPlatformHandle buffer_handle_for_transit_; bool in_two_phase_read_ = false; diff --git a/chromium/mojo/edk/system/data_pipe_control_message.cc b/chromium/mojo/edk/system/data_pipe_control_message.cc index e81df6a95d6..23873b82903 100644 --- a/chromium/mojo/edk/system/data_pipe_control_message.cc +++ b/chromium/mojo/edk/system/data_pipe_control_message.cc @@ -15,7 +15,7 @@ void SendDataPipeControlMessage(NodeController* node_controller, const ports::PortRef& port, DataPipeCommand command, uint32_t num_bytes) { - scoped_ptr<PortsMessage> message = + std::unique_ptr<PortsMessage> message = PortsMessage::NewUserMessage(sizeof(DataPipeControlMessage), 0, 0); CHECK(message); @@ -24,7 +24,7 @@ void SendDataPipeControlMessage(NodeController* node_controller, data->command = command; data->num_bytes = num_bytes; - int rv = node_controller->SendMessage(port, &message); + int rv = node_controller->SendMessage(port, std::move(message)); if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { DLOG(ERROR) << "Unexpected failure sending data pipe control message: " << rv; diff --git a/chromium/mojo/edk/system/data_pipe_control_message.h b/chromium/mojo/edk/system/data_pipe_control_message.h index 7552dd01dc0..82ee594ea94 100644 --- a/chromium/mojo/edk/system/data_pipe_control_message.h +++ b/chromium/mojo/edk/system/data_pipe_control_message.h @@ -7,7 +7,8 @@ #include <stdint.h> -#include "base/memory/scoped_ptr.h" +#include <memory> + #include "mojo/edk/embedder/scoped_platform_handle.h" #include "mojo/edk/system/ports/port_ref.h" #include "mojo/public/c/system/macros.h" diff --git a/chromium/mojo/edk/system/data_pipe_producer_dispatcher.cc b/chromium/mojo/edk/system/data_pipe_producer_dispatcher.cc index 364b898e236..d056e7dbc83 100644 --- a/chromium/mojo/edk/system/data_pipe_producer_dispatcher.cc +++ b/chromium/mojo/edk/system/data_pipe_producer_dispatcher.cc @@ -496,16 +496,12 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { size_t previous_capacity = available_capacity_; ports::PortStatus port_status; - if (node_controller_->node()->GetStatus(control_port_, &port_status) != - ports::OK || - !port_status.receiving_messages) { + int rv = node_controller_->node()->GetStatus(control_port_, &port_status); + if (rv != ports::OK || !port_status.receiving_messages) { DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure" << " [control_port=" << control_port_.name() << "]"; - peer_closed_ = true; - } - - if (port_status.has_messages && !in_transit_) { + } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { ports::ScopedMessage message; do { int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr, diff --git a/chromium/mojo/edk/system/data_pipe_producer_dispatcher.h b/chromium/mojo/edk/system/data_pipe_producer_dispatcher.h index cfdeb965ac4..a55234a1dfe 100644 --- a/chromium/mojo/edk/system/data_pipe_producer_dispatcher.h +++ b/chromium/mojo/edk/system/data_pipe_producer_dispatcher.h @@ -8,9 +8,10 @@ #include <stddef.h> #include <stdint.h> +#include <memory> + #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/synchronization/lock.h" #include "mojo/edk/embedder/platform_handle_vector.h" #include "mojo/edk/embedder/platform_shared_buffer.h" @@ -107,7 +108,7 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipeProducerDispatcher final bool buffer_requested_ = false; scoped_refptr<PlatformSharedBuffer> shared_ring_buffer_; - scoped_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_; + std::unique_ptr<PlatformSharedBufferMapping> ring_buffer_mapping_; ScopedPlatformHandle buffer_handle_for_transit_; bool in_transit_ = false; diff --git a/chromium/mojo/edk/system/data_pipe_unittest.cc b/chromium/mojo/edk/system/data_pipe_unittest.cc index 944340bb269..526444c6d58 100644 --- a/chromium/mojo/edk/system/data_pipe_unittest.cc +++ b/chromium/mojo/edk/system/data_pipe_unittest.cc @@ -5,11 +5,12 @@ #include <stddef.h> #include <stdint.h> +#include <memory> + #include "base/bind.h" #include "base/location.h" #include "base/logging.h" #include "base/macros.h" -#include "base/memory/scoped_ptr.h" #include "base/message_loop/message_loop.h" #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/embedder/platform_channel_pair.h" @@ -1657,13 +1658,7 @@ bool ReadAllData(MojoHandle consumer, #if !defined(OS_IOS) -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_Multiprocess DISABLED_Multiprocess -#else -#define MAYBE_Multiprocess Multiprocess -#endif // defined(OS_ANDROID) -TEST_F(DataPipeTest, MAYBE_Multiprocess) { +TEST_F(DataPipeTest, Multiprocess) { const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kMultiprocessTestData)); const MojoCreateDataPipeOptions options = { @@ -1836,13 +1831,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) { EXPECT_EQ("quit", ReadMessage(h)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_SendConsumerAndCloseProducer DISABLED_SendConsumerAndCloseProducer -#else -#define MAYBE_SendConsumerAndCloseProducer SendConsumerAndCloseProducer -#endif // defined(OS_ANDROID) -TEST_F(DataPipeTest, MAYBE_SendConsumerAndCloseProducer) { +TEST_F(DataPipeTest, SendConsumerAndCloseProducer) { // Create a new data pipe. MojoHandle p, c; EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p ,&c)); @@ -1885,13 +1874,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) { EXPECT_EQ("quit", ReadMessage(h)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_CreateInChild DISABLED_CreateInChild -#else -#define MAYBE_CreateInChild CreateInChild -#endif // defined(OS_ANDROID) -TEST_F(DataPipeTest, MAYBE_CreateInChild) { +TEST_F(DataPipeTest, CreateInChild) { RUN_CHILD_ON_PIPE(CreateAndWrite, child) MojoHandle c; std::string expected_message = ReadMessageWithHandles(child, &c, 1); diff --git a/chromium/mojo/edk/system/dispatcher.cc b/chromium/mojo/edk/system/dispatcher.cc index 6705542ecf7..7d701b24428 100644 --- a/chromium/mojo/edk/system/dispatcher.cc +++ b/chromium/mojo/edk/system/dispatcher.cc @@ -32,19 +32,17 @@ MojoResult Dispatcher::CancelWatch(uintptr_t context) { return MOJO_RESULT_INVALID_ARGUMENT; } -MojoResult Dispatcher::WriteMessage(const void* bytes, - uint32_t num_bytes, - const DispatcherInTransit* dispatchers, - uint32_t num_dispatchers, +MojoResult Dispatcher::WriteMessage(std::unique_ptr<MessageForTransit> message, MojoWriteMessageFlags flags) { return MOJO_RESULT_INVALID_ARGUMENT; } -MojoResult Dispatcher::ReadMessage(void* bytes, +MojoResult Dispatcher::ReadMessage(std::unique_ptr<MessageForTransit>* message, uint32_t* num_bytes, MojoHandle* handles, uint32_t* num_handles, - MojoReadMessageFlags flags) { + MojoReadMessageFlags flags, + bool read_any_size) { return MOJO_RESULT_INVALID_ARGUMENT; } @@ -58,7 +56,7 @@ MojoResult Dispatcher::MapBuffer( uint64_t offset, uint64_t num_bytes, MojoMapBufferFlags flags, - scoped_ptr<PlatformSharedBufferMapping>* mapping) { + std::unique_ptr<PlatformSharedBufferMapping>* mapping) { return MOJO_RESULT_INVALID_ARGUMENT; } diff --git a/chromium/mojo/edk/system/dispatcher.h b/chromium/mojo/edk/system/dispatcher.h index e8bcf330c64..9dca67fa45f 100644 --- a/chromium/mojo/edk/system/dispatcher.h +++ b/chromium/mojo/edk/system/dispatcher.h @@ -8,12 +8,12 @@ #include <stddef.h> #include <stdint.h> +#include <memory> #include <ostream> #include <vector> #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/synchronization/lock.h" #include "mojo/edk/embedder/platform_handle.h" #include "mojo/edk/embedder/platform_shared_buffer.h" @@ -31,6 +31,7 @@ namespace edk { class Awakable; class Dispatcher; +class MessageForTransit; using DispatcherVector = std::vector<scoped_refptr<Dispatcher>>; @@ -76,17 +77,15 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher ///////////// Message pipe API ///////////// - virtual MojoResult WriteMessage(const void* bytes, - uint32_t num_bytes, - const DispatcherInTransit* dispatchers, - uint32_t num_dispatchers, + virtual MojoResult WriteMessage(std::unique_ptr<MessageForTransit> message, MojoWriteMessageFlags flags); - virtual MojoResult ReadMessage(void* bytes, + virtual MojoResult ReadMessage(std::unique_ptr<MessageForTransit>* message, uint32_t* num_bytes, MojoHandle* handles, uint32_t* num_handles, - MojoReadMessageFlags flags); + MojoReadMessageFlags flags, + bool read_any_size); ///////////// Shared buffer API ///////////// @@ -101,7 +100,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher uint64_t offset, uint64_t num_bytes, MojoMapBufferFlags flags, - scoped_ptr<PlatformSharedBufferMapping>* mapping); + std::unique_ptr<PlatformSharedBufferMapping>* mapping); ///////////// Data pipe consumer API ///////////// diff --git a/chromium/mojo/edk/system/mach_port_relay.cc b/chromium/mojo/edk/system/mach_port_relay.cc index 2dd40e51aae..ddd633c8a45 100644 --- a/chromium/mojo/edk/system/mach_port_relay.cc +++ b/chromium/mojo/edk/system/mach_port_relay.cc @@ -11,12 +11,59 @@ #include "base/logging.h" #include "base/mac/mach_port_util.h" #include "base/mac/scoped_mach_port.h" +#include "base/metrics/histogram_macros.h" #include "base/process/process.h" #include "mojo/edk/embedder/platform_handle_vector.h" namespace mojo { namespace edk { +namespace { + +// Errors that can occur in the broker (privileged parent) process. +// These match tools/metrics/histograms.xml. +// This enum is append-only. +enum class BrokerUMAError : int { + SUCCESS = 0, + // Couldn't get a task port for the process with a given pid. + ERROR_TASK_FOR_PID = 1, + // Couldn't make a port with receive rights in the destination process. + ERROR_MAKE_RECEIVE_PORT = 2, + // Couldn't change the attributes of a Mach port. + ERROR_SET_ATTRIBUTES = 3, + // Couldn't extract a right from the destination. + ERROR_EXTRACT_DEST_RIGHT = 4, + // Couldn't send a Mach port in a call to mach_msg(). + ERROR_SEND_MACH_PORT = 5, + // Couldn't extract a right from the source. + ERROR_EXTRACT_SOURCE_RIGHT = 6, + ERROR_MAX +}; + +// Errors that can occur in a child process. +// These match tools/metrics/histograms.xml. +// This enum is append-only. +enum class ChildUMAError : int { + SUCCESS = 0, + // An error occurred while trying to receive a Mach port with mach_msg(). + ERROR_RECEIVE_MACH_MESSAGE = 1, + ERROR_MAX +}; + +void ReportBrokerError(BrokerUMAError error) { + UMA_HISTOGRAM_ENUMERATION("Mojo.MachPortRelay.BrokerError", + static_cast<int>(error), + static_cast<int>(BrokerUMAError::ERROR_MAX)); +} + +void ReportChildError(ChildUMAError error) { + UMA_HISTOGRAM_ENUMERATION("Mojo.MachPortRelay.ChildError", + static_cast<int>(error), + static_cast<int>(ChildUMAError::ERROR_MAX)); +} + +} // namespace + // static bool MachPortRelay::ReceivePorts(PlatformHandleVector* handles) { DCHECK(handles); @@ -27,15 +74,22 @@ bool MachPortRelay::ReceivePorts(PlatformHandleVector* handles) { if (handle->type != PlatformHandle::Type::MACH_NAME) continue; + if (handle->port == MACH_PORT_NULL) { + handle->type = PlatformHandle::Type::MACH; + continue; + } + base::mac::ScopedMachReceiveRight message_port(handle->port); base::mac::ScopedMachSendRight received_port( base::ReceiveMachPort(message_port.get())); if (received_port.get() == MACH_PORT_NULL) { + ReportChildError(ChildUMAError::ERROR_RECEIVE_MACH_MESSAGE); handle->port = MACH_PORT_NULL; LOG(ERROR) << "Error receiving mach port"; return false; } + ReportChildError(ChildUMAError::SUCCESS); handle->port = received_port.release(); handle->type = PlatformHandle::Type::MACH; } @@ -57,8 +111,13 @@ bool MachPortRelay::SendPortsToProcess(Channel::Message* message, base::ProcessHandle process) { DCHECK(message); mach_port_t task_port = port_provider_->TaskForPid(process); - if (task_port == MACH_PORT_NULL) + if (task_port == MACH_PORT_NULL) { + // Callers check the port provider for the task port before calling this + // function, in order to queue pending messages. Therefore, if this fails, + // it should be considered a genuine, bona fide, electrified, six-car error. + ReportBrokerError(BrokerUMAError::ERROR_TASK_FOR_PID); return false; + } size_t num_sent = 0; bool error = false; @@ -72,15 +131,39 @@ bool MachPortRelay::SendPortsToProcess(Channel::Message* message, if (handle->type != PlatformHandle::Type::MACH) continue; + if (handle->port == MACH_PORT_NULL) { + handle->type = PlatformHandle::Type::MACH_NAME; + num_sent++; + continue; + } + mach_port_name_t intermediate_port; - DCHECK(handle->port != MACH_PORT_NULL); + base::MachCreateError error_code; intermediate_port = base::CreateIntermediateMachPort( - task_port, base::mac::ScopedMachSendRight(handle->port), nullptr); + task_port, base::mac::ScopedMachSendRight(handle->port), &error_code); if (intermediate_port == MACH_PORT_NULL) { + BrokerUMAError uma_error; + switch (error_code) { + case base::MachCreateError::ERROR_MAKE_RECEIVE_PORT: + uma_error = BrokerUMAError::ERROR_MAKE_RECEIVE_PORT; + break; + case base::MachCreateError::ERROR_SET_ATTRIBUTES: + uma_error = BrokerUMAError::ERROR_SET_ATTRIBUTES; + break; + case base::MachCreateError::ERROR_EXTRACT_DEST_RIGHT: + uma_error = BrokerUMAError::ERROR_EXTRACT_DEST_RIGHT; + break; + case base::MachCreateError::ERROR_SEND_MACH_PORT: + uma_error = BrokerUMAError::ERROR_SEND_MACH_PORT; + break; + } + ReportBrokerError(uma_error); handle->port = MACH_PORT_NULL; error = true; break; } + + ReportBrokerError(BrokerUMAError::SUCCESS); handle->port = intermediate_port; handle->type = PlatformHandle::Type::MACH_NAME; num_sent++; @@ -96,8 +179,10 @@ bool MachPortRelay::ExtractPortRights(Channel::Message* message, DCHECK(message); mach_port_t task_port = port_provider_->TaskForPid(process); - if (task_port == MACH_PORT_NULL) + if (task_port == MACH_PORT_NULL) { + ReportBrokerError(BrokerUMAError::ERROR_TASK_FOR_PID); return false; + } size_t num_received = 0; bool error = false; @@ -111,6 +196,12 @@ bool MachPortRelay::ExtractPortRights(Channel::Message* message, if (handle->type != PlatformHandle::Type::MACH_NAME) continue; + if (handle->port == MACH_PORT_NULL) { + handle->type = PlatformHandle::Type::MACH; + num_received++; + continue; + } + mach_port_t extracted_right = MACH_PORT_NULL; mach_msg_type_name_t extracted_right_type; kern_return_t kr = @@ -118,10 +209,12 @@ bool MachPortRelay::ExtractPortRights(Channel::Message* message, MACH_MSG_TYPE_MOVE_SEND, &extracted_right, &extracted_right_type); if (kr != KERN_SUCCESS) { + ReportBrokerError(BrokerUMAError::ERROR_EXTRACT_SOURCE_RIGHT); error = true; break; } + ReportBrokerError(BrokerUMAError::SUCCESS); DCHECK_EQ(static_cast<mach_msg_type_name_t>(MACH_MSG_TYPE_PORT_SEND), extracted_right_type); handle->port = extracted_right; diff --git a/chromium/mojo/edk/system/mapping_table.cc b/chromium/mojo/edk/system/mapping_table.cc index 46afe5a56bb..850944306ea 100644 --- a/chromium/mojo/edk/system/mapping_table.cc +++ b/chromium/mojo/edk/system/mapping_table.cc @@ -20,7 +20,7 @@ MappingTable::~MappingTable() { } MojoResult MappingTable::AddMapping( - scoped_ptr<PlatformSharedBufferMapping> mapping) { + std::unique_ptr<PlatformSharedBufferMapping> mapping) { DCHECK(mapping); if (address_to_mapping_map_.size() >= diff --git a/chromium/mojo/edk/system/mapping_table.h b/chromium/mojo/edk/system/mapping_table.h index 120f9cad73d..00167e36048 100644 --- a/chromium/mojo/edk/system/mapping_table.h +++ b/chromium/mojo/edk/system/mapping_table.h @@ -7,11 +7,11 @@ #include <stdint.h> +#include <memory> #include <vector> #include "base/containers/hash_tables.h" #include "base/macros.h" -#include "base/memory/scoped_ptr.h" #include "mojo/edk/system/system_impl_export.h" #include "mojo/public/c/system/types.h" @@ -38,7 +38,7 @@ class MOJO_SYSTEM_IMPL_EXPORT MappingTable { // Tries to add a mapping. (Takes ownership of the mapping in all cases; on // failure, it will be destroyed.) - MojoResult AddMapping(scoped_ptr<PlatformSharedBufferMapping> mapping); + MojoResult AddMapping(std::unique_ptr<PlatformSharedBufferMapping> mapping); MojoResult RemoveMapping(void* address); private: diff --git a/chromium/mojo/edk/system/message_for_transit.cc b/chromium/mojo/edk/system/message_for_transit.cc new file mode 100644 index 00000000000..26658e161c6 --- /dev/null +++ b/chromium/mojo/edk/system/message_for_transit.cc @@ -0,0 +1,136 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "mojo/edk/system/message_for_transit.h" + +#include <vector> + +#include "mojo/edk/embedder/platform_handle_vector.h" + +namespace mojo { +namespace edk { + +namespace { + +static_assert(sizeof(MessageForTransit::MessageHeader) % 8 == 0, + "Invalid MessageHeader size."); +static_assert(sizeof(MessageForTransit::DispatcherHeader) % 8 == 0, + "Invalid DispatcherHeader size."); + +} // namespace + +MessageForTransit::~MessageForTransit() {} + +// static +MojoResult MessageForTransit::Create( + std::unique_ptr<MessageForTransit>* message, + uint32_t num_bytes, + const Dispatcher::DispatcherInTransit* dispatchers, + uint32_t num_dispatchers) { + // A structure for retaining information about every Dispatcher that will be + // sent with this message. + struct DispatcherInfo { + uint32_t num_bytes; + uint32_t num_ports; + uint32_t num_handles; + }; + + // This is only the base header size. It will grow as we accumulate the + // size of serialized state for each dispatcher. + size_t header_size = sizeof(MessageHeader) + + num_dispatchers * sizeof(DispatcherHeader); + size_t num_ports = 0; + size_t num_handles = 0; + + std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); + for (size_t i = 0; i < num_dispatchers; ++i) { + Dispatcher* d = dispatchers[i].dispatcher.get(); + d->StartSerialize(&dispatcher_info[i].num_bytes, + &dispatcher_info[i].num_ports, + &dispatcher_info[i].num_handles); + header_size += dispatcher_info[i].num_bytes; + num_ports += dispatcher_info[i].num_ports; + num_handles += dispatcher_info[i].num_handles; + } + + // We now have enough information to fully allocate the message storage. + std::unique_ptr<PortsMessage> msg = PortsMessage::NewUserMessage( + header_size + num_bytes, num_ports, num_handles); + if (!msg) + return MOJO_RESULT_RESOURCE_EXHAUSTED; + + // Populate the message header with information about serialized dispatchers. + // + // The front of the message is always a MessageHeader followed by a + // DispatcherHeader for each dispatcher to be sent. + MessageHeader* header = + static_cast<MessageHeader*>(msg->mutable_payload_bytes()); + DispatcherHeader* dispatcher_headers = + reinterpret_cast<DispatcherHeader*>(header + 1); + + // Serialized dispatcher state immediately follows the series of + // DispatcherHeaders. + char* dispatcher_data = + reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); + + header->num_dispatchers = num_dispatchers; + + // |header_size| is the total number of bytes preceding the message payload, + // including all dispatcher headers and serialized dispatcher state. + DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); + header->header_size = static_cast<uint32_t>(header_size); + + if (num_dispatchers > 0) { + ScopedPlatformHandleVectorPtr handles( + new PlatformHandleVector(num_handles)); + size_t port_index = 0; + size_t handle_index = 0; + bool fail = false; + for (size_t i = 0; i < num_dispatchers; ++i) { + Dispatcher* d = dispatchers[i].dispatcher.get(); + DispatcherHeader* dh = &dispatcher_headers[i]; + const DispatcherInfo& info = dispatcher_info[i]; + + // Fill in the header for this dispatcher. + dh->type = static_cast<int32_t>(d->GetType()); + dh->num_bytes = info.num_bytes; + dh->num_ports = info.num_ports; + dh->num_platform_handles = info.num_handles; + + // Fill in serialized state, ports, and platform handles. We'll cancel + // the send if the dispatcher implementation rejects for some reason. + if (!d->EndSerialize(static_cast<void*>(dispatcher_data), + msg->mutable_ports() + port_index, + handles->data() + handle_index)) { + fail = true; + break; + } + + dispatcher_data += info.num_bytes; + port_index += info.num_ports; + handle_index += info.num_handles; + } + + if (fail) { + // Release any platform handles we've accumulated. Their dispatchers + // retain ownership when message creation fails, so these are not actually + // leaking. + handles->clear(); + return MOJO_RESULT_INVALID_ARGUMENT; + } + + // Take ownership of all the handles and move them into message storage. + msg->SetHandles(std::move(handles)); + } + + message->reset(new MessageForTransit(std::move(msg))); + return MOJO_RESULT_OK; +} + +MessageForTransit::MessageForTransit(std::unique_ptr<PortsMessage> message) + : message_(std::move(message)) { +} + +} // namespace edk +} // namespace mojo diff --git a/chromium/mojo/edk/system/message_for_transit.h b/chromium/mojo/edk/system/message_for_transit.h new file mode 100644 index 00000000000..be035b04794 --- /dev/null +++ b/chromium/mojo/edk/system/message_for_transit.h @@ -0,0 +1,113 @@ +// Copyright 2016 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MOJO_EDK_SYSTEM_MESSAGE_FOR_TRANSIT_H_ +#define MOJO_EDK_SYSTEM_MESSAGE_FOR_TRANSIT_H_ + +#include <stdint.h> + +#include <memory> + +#include "base/macros.h" +#include "base/memory/ptr_util.h" +#include "mojo/edk/system/dispatcher.h" +#include "mojo/edk/system/ports_message.h" +#include "mojo/edk/system/system_impl_export.h" + +namespace mojo { +namespace edk { + +// MessageForTransit holds onto a PortsMessage which may be sent via +// |MojoWriteMessage()| or which may have been received on a pipe endpoint. +// Instances of this class are exposed to Mojo system API consumers via the +// opaque pointers used with |MojoCreateMessage()|, |MojoDestroyMessage()|, +// |MojoWriteMessageNew()|, and |MojoReadMessageNew()|. +class MOJO_SYSTEM_IMPL_EXPORT MessageForTransit { + public: +#pragma pack(push, 1) + // Header attached to every message. + struct MessageHeader { + // The number of serialized dispatchers included in this header. + uint32_t num_dispatchers; + + // Total size of the header, including serialized dispatcher data. + uint32_t header_size; + }; + + // Header for each dispatcher in a message, immediately following the message + // header. + struct DispatcherHeader { + // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. + int32_t type; + + // The size of the serialized dispatcher, not including this header. + uint32_t num_bytes; + + // The number of ports needed to deserialize this dispatcher. + uint32_t num_ports; + + // The number of platform handles needed to deserialize this dispatcher. + uint32_t num_platform_handles; + }; +#pragma pack(pop) + + ~MessageForTransit(); + + // A static constructor for building outbound messages. + static MojoResult Create( + std::unique_ptr<MessageForTransit>* message, + uint32_t num_bytes, + const Dispatcher::DispatcherInTransit* dispatchers, + uint32_t num_dispatchers); + + // A static constructor for wrapping inbound messages. + static std::unique_ptr<MessageForTransit> WrapPortsMessage( + std::unique_ptr<PortsMessage> message) { + return base::WrapUnique(new MessageForTransit(std::move(message))); + } + + const void* bytes() const { + DCHECK(message_); + return static_cast<const void*>( + static_cast<const char*>(message_->payload_bytes()) + + header()->header_size); + } + + void* mutable_bytes() { + DCHECK(message_); + return static_cast<void*>( + static_cast<char*>(message_->mutable_payload_bytes()) + + header()->header_size); + } + + size_t num_bytes() const { + size_t header_size = header()->header_size; + DCHECK_GE(message_->num_payload_bytes(), header_size); + return message_->num_payload_bytes() - header_size; + } + + size_t num_handles() const { return header()->num_dispatchers; } + + std::unique_ptr<PortsMessage> TakePortsMessage() { + return std::move(message_); + } + + private: + explicit MessageForTransit(std::unique_ptr<PortsMessage> message); + + const MessageForTransit::MessageHeader* header() const { + DCHECK(message_); + return static_cast<const MessageForTransit::MessageHeader*>( + message_->payload_bytes()); + } + + std::unique_ptr<PortsMessage> message_; + + DISALLOW_COPY_AND_ASSIGN(MessageForTransit); +}; + +} // namespace edk +} // namespace mojo + +#endif // MOJO_EDK_SYSTEM_MESSAGE_FOR_TRANSIT_H_ diff --git a/chromium/mojo/edk/system/message_pipe_dispatcher.cc b/chromium/mojo/edk/system/message_pipe_dispatcher.cc index 7e6555dfeda..42c9af0025e 100644 --- a/chromium/mojo/edk/system/message_pipe_dispatcher.cc +++ b/chromium/mojo/edk/system/message_pipe_dispatcher.cc @@ -5,12 +5,13 @@ #include "mojo/edk/system/message_pipe_dispatcher.h" #include <limits> +#include <memory> #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/system/core.h" +#include "mojo/edk/system/message_for_transit.h" #include "mojo/edk/system/node_controller.h" #include "mojo/edk/system/ports_message.h" #include "mojo/edk/system/request_context.h" @@ -20,36 +21,10 @@ namespace edk { namespace { -#pragma pack(push, 1) - -// Header attached to every message sent over a message pipe. -struct MessageHeader { - // The number of serialized dispatchers included in this header. - uint32_t num_dispatchers; - - // Total size of the header, including serialized dispatcher data. - uint32_t header_size; -}; - -static_assert(sizeof(MessageHeader) % 8 == 0, "Invalid MessageHeader size."); - -// Header for each dispatcher, immediately following the message header. -struct DispatcherHeader { - // The type of the dispatcher, correpsonding to the Dispatcher::Type enum. - int32_t type; +using DispatcherHeader = MessageForTransit::DispatcherHeader; +using MessageHeader = MessageForTransit::MessageHeader; - // The size of the serialized dispatcher, not including this header. - uint32_t num_bytes; - - // The number of ports needed to deserialize this dispatcher. - uint32_t num_ports; - - // The number of platform handles needed to deserialize this dispatcher. - uint32_t num_platform_handles; -}; - -static_assert(sizeof(DispatcherHeader) % 8 == 0, - "Invalid DispatcherHeader size."); +#pragma pack(push, 1) struct SerializedState { uint64_t pipe_id; @@ -107,7 +82,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { { base::AutoLock lock(signal_lock_); port0 = port_; - port_closed_ = true; + port_closed_.Set(true); awakables_.CancelAll(); } @@ -115,7 +90,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { { base::AutoLock lock(other->signal_lock_); port1 = other->port_; - other->port_closed_ = true; + other->port_closed_.Set(true); other->awakables_.CancelAll(); } @@ -157,185 +132,63 @@ MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { } MojoResult MessagePipeDispatcher::WriteMessage( - const void* bytes, - uint32_t num_bytes, - const DispatcherInTransit* dispatchers, - uint32_t num_dispatchers, + std::unique_ptr<MessageForTransit> message, MojoWriteMessageFlags flags) { + if (port_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; - { - base::AutoLock lock(signal_lock_); - if (port_closed_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - } - - // A structure for retaining information about every Dispatcher we're about - // to send. This information is collected by calling StartSerialize() on - // each dispatcher in sequence. - struct DispatcherInfo { - uint32_t num_bytes; - uint32_t num_ports; - uint32_t num_handles; - }; - - // This is only the base header size. It will grow as we accumulate the - // size of serialized state for each dispatcher. - size_t header_size = sizeof(MessageHeader) + - num_dispatchers * sizeof(DispatcherHeader); - - size_t num_ports = 0; - size_t num_handles = 0; - - std::vector<DispatcherInfo> dispatcher_info(num_dispatchers); - for (size_t i = 0; i < num_dispatchers; ++i) { - Dispatcher* d = dispatchers[i].dispatcher.get(); - d->StartSerialize(&dispatcher_info[i].num_bytes, - &dispatcher_info[i].num_ports, - &dispatcher_info[i].num_handles); - header_size += dispatcher_info[i].num_bytes; - num_ports += dispatcher_info[i].num_ports; - num_handles += dispatcher_info[i].num_handles; - } - - // We now have enough information to fully allocate the message storage. - scoped_ptr<PortsMessage> message = PortsMessage::NewUserMessage( - header_size + num_bytes, num_ports, num_handles); - DCHECK(message); - - // Populate the message header with information about serialized dispatchers. - // - // The front of the message is always a MessageHeader followed by a - // DispatcherHeader for each dispatcher to be sent. - MessageHeader* header = - static_cast<MessageHeader*>(message->mutable_payload_bytes()); - DispatcherHeader* dispatcher_headers = - reinterpret_cast<DispatcherHeader*>(header + 1); - - // Serialized dispatcher state immediately follows the series of - // DispatcherHeaders. - char* dispatcher_data = - reinterpret_cast<char*>(dispatcher_headers + num_dispatchers); - - header->num_dispatchers = num_dispatchers; - - // |header_size| is the total number of bytes preceding the message payload, - // including all dispatcher headers and serialized dispatcher state. - DCHECK_LE(header_size, std::numeric_limits<uint32_t>::max()); - header->header_size = static_cast<uint32_t>(header_size); - - bool cancel_transit = false; - if (num_dispatchers > 0) { - ScopedPlatformHandleVectorPtr handles( - new PlatformHandleVector(num_handles)); - size_t port_index = 0; - size_t handle_index = 0; - for (size_t i = 0; i < num_dispatchers; ++i) { - Dispatcher* d = dispatchers[i].dispatcher.get(); - DispatcherHeader* dh = &dispatcher_headers[i]; - const DispatcherInfo& info = dispatcher_info[i]; - - // Fill in the header for this dispatcher. - dh->type = static_cast<int32_t>(d->GetType()); - dh->num_bytes = info.num_bytes; - dh->num_ports = info.num_ports; - dh->num_platform_handles = info.num_handles; - - // Fill in serialized state, ports, and platform handles. We'll cancel - // the send if the dispatcher implementation rejects for some reason. - if (!d->EndSerialize(static_cast<void*>(dispatcher_data), - message->mutable_ports() + port_index, - handles->data() + handle_index)) { - cancel_transit = true; - break; - } - - dispatcher_data += info.num_bytes; - port_index += info.num_ports; - handle_index += info.num_handles; - } + size_t num_bytes = message->num_bytes(); + int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); - if (!cancel_transit) { - // Take ownership of all the handles and move them into message storage. - message->SetHandles(std::move(handles)); - } else { - // Release any platform handles we've accumulated. Their dispatchers - // retain ownership when transit is canceled, so these are not actually - // leaking. - handles->clear(); - } - } + DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ + << " [port=" << port_.name() << "; rv=" << rv + << "; num_bytes=" << num_bytes << "]"; - MojoResult result = MOJO_RESULT_OK; - if (!cancel_transit) { - // Copy the message body. - void* message_body = static_cast<void*>( - static_cast<char*>(message->mutable_payload_bytes()) + header_size); - memcpy(message_body, bytes, num_bytes); - - int rv = node_controller_->SendMessage(port_, &message); - - DVLOG(1) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ - << " [port=" << port_.name() << "; rv=" << rv - << "; num_bytes=" << num_bytes << "]"; - - if (rv != ports::OK) { - if (rv == ports::ERROR_PORT_UNKNOWN || - rv == ports::ERROR_PORT_STATE_UNEXPECTED || - rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { - result = MOJO_RESULT_INVALID_ARGUMENT; - } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { - base::AutoLock lock(signal_lock_); - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); - result = MOJO_RESULT_FAILED_PRECONDITION; - } else { - NOTREACHED(); - result = MOJO_RESULT_UNKNOWN; - } - cancel_transit = true; - } else { - DCHECK(!message); + if (rv != ports::OK) { + if (rv == ports::ERROR_PORT_UNKNOWN || + rv == ports::ERROR_PORT_STATE_UNEXPECTED || + rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { + return MOJO_RESULT_INVALID_ARGUMENT; + } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { + base::AutoLock lock(signal_lock_); + awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); + return MOJO_RESULT_FAILED_PRECONDITION; } - } - if (cancel_transit) { - // We ended up not sending the message. Release all the platform handles. - // Their dipatchers retain ownership when transit is canceled, so these are - // not actually leaking. - DCHECK(message); - Channel::MessagePtr m = message->TakeChannelMessage(); - ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); - if (handles) - handles->clear(); + NOTREACHED(); + return MOJO_RESULT_UNKNOWN; } - return result; + return MOJO_RESULT_OK; } -MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, - uint32_t* num_bytes, - MojoHandle* handles, - uint32_t* num_handles, - MojoReadMessageFlags flags) { - { - base::AutoLock lock(signal_lock_); - // We can't read from a port that's closed or in transit! - if (port_closed_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - } +MojoResult MessagePipeDispatcher::ReadMessage( + std::unique_ptr<MessageForTransit>* message, + uint32_t* num_bytes, + MojoHandle* handles, + uint32_t* num_handles, + MojoReadMessageFlags flags, + bool read_any_size) { + // We can't read from a port that's closed or in transit! + if (port_closed_ || in_transit_) + return MOJO_RESULT_INVALID_ARGUMENT; bool no_space = false; bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; bool invalid_message = false; - // Ensure the provided buffers are large enough to hold the next message. - // GetMessageIf provides an atomic way to test the next message without - // committing to removing it from the port's underlying message queue until - // we are sure we can consume it. + // Grab a message if the provided handles buffer is large enough. If the input + // |num_bytes| is provided and |read_any_size| is false, we also ensure + // that it specifies a size at least as large as the next available payload. + // + // If |read_any_size| is true, the input value of |*num_bytes| is ignored. + // This flag exists to support both new and old API behavior. ports::ScopedMessage ports_message; int rv = node_controller_->node()->GetMessageIf( port_, - [num_bytes, num_handles, &no_space, &may_discard, &invalid_message]( + [read_any_size, num_bytes, num_handles, &no_space, &may_discard, + &invalid_message]( const ports::Message& next_message) { const PortsMessage& message = static_cast<const PortsMessage&>(next_message); @@ -367,8 +220,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, *num_handles = handles_available; } - if (bytes_to_read < bytes_available || - handles_to_read < handles_available) { + if (handles_to_read < handles_available || + (!read_any_size && bytes_to_read < bytes_available)) { no_space = true; return may_discard; } @@ -390,9 +243,9 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, } if (no_space) { - // Either |*num_bytes| or |*num_handles| wasn't sufficient to hold this - // message's data. The message will still be in queue unless - // MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. + // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't + // sufficient to hold this message's data. The message will still be in + // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. return MOJO_RESULT_RESOURCE_EXHAUSTED; } @@ -412,11 +265,11 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, // Alright! We have a message and the caller has provided sufficient storage // in which to receive it. - scoped_ptr<PortsMessage> message( + std::unique_ptr<PortsMessage> msg( static_cast<PortsMessage*>(ports_message.release())); const MessageHeader* header = - static_cast<const MessageHeader*>(message->payload_bytes()); + static_cast<const MessageHeader*>(msg->payload_bytes()); const DispatcherHeader* dispatcher_headers = reinterpret_cast<const DispatcherHeader*>(header + 1); @@ -440,28 +293,26 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, Type type = static_cast<Type>(dh.type); size_t next_payload_index = data_payload_index + dh.num_bytes; - if (message->num_payload_bytes() < next_payload_index || + if (msg->num_payload_bytes() < next_payload_index || next_payload_index < data_payload_index) { return MOJO_RESULT_UNKNOWN; } size_t next_port_index = port_index + dh.num_ports; - if (message->num_ports() < next_port_index || - next_port_index < port_index) + if (msg->num_ports() < next_port_index || next_port_index < port_index) return MOJO_RESULT_UNKNOWN; size_t next_platform_handle_index = platform_handle_index + dh.num_platform_handles; - if (message->num_handles() < next_platform_handle_index || + if (msg->num_handles() < next_platform_handle_index || next_platform_handle_index < platform_handle_index) { return MOJO_RESULT_UNKNOWN; } PlatformHandle* out_handles = - message->num_handles() ? message->handles() + platform_handle_index - : nullptr; + msg->num_handles() ? msg->handles() + platform_handle_index : nullptr; dispatchers[i].dispatcher = Dispatcher::Deserialize( - type, dispatcher_data, dh.num_bytes, message->ports() + port_index, + type, dispatcher_data, dh.num_bytes, msg->ports() + port_index, dh.num_ports, out_handles, dh.num_platform_handles); if (!dispatchers[i].dispatcher) return MOJO_RESULT_UNKNOWN; @@ -477,12 +328,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, return MOJO_RESULT_UNKNOWN; } - // Copy message bytes. - DCHECK_GE(message->num_payload_bytes(), header->header_size); - const char* payload = reinterpret_cast<const char*>(message->payload_bytes()); - memcpy(bytes, payload + header->header_size, - message->num_payload_bytes() - header->header_size); - + CHECK(msg); + *message = MessageForTransit::WrapPortsMessage(std::move(msg)); return MOJO_RESULT_OK; } @@ -574,7 +421,7 @@ bool MessagePipeDispatcher::BeginTransit() { base::AutoLock lock(signal_lock_); if (in_transit_ || port_closed_) return false; - in_transit_ = true; + in_transit_.Set(true); return in_transit_; } @@ -582,14 +429,14 @@ void MessagePipeDispatcher::CompleteTransitAndClose() { node_controller_->SetPortObserver(port_, nullptr); base::AutoLock lock(signal_lock_); - in_transit_ = false; port_transferred_ = true; + in_transit_.Set(false); CloseNoLock(); } void MessagePipeDispatcher::CancelTransit() { base::AutoLock lock(signal_lock_); - in_transit_ = false; + in_transit_.Set(false); // Something may have happened while we were waiting for potential transit. awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); @@ -626,7 +473,7 @@ MojoResult MessagePipeDispatcher::CloseNoLock() { if (port_closed_ || in_transit_) return MOJO_RESULT_INVALID_ARGUMENT; - port_closed_ = true; + port_closed_.Set(true); awakables_.CancelAll(); if (!port_transferred_) { @@ -676,22 +523,23 @@ void MessagePipeDispatcher::OnPortStatusChanged() { #if !defined(NDEBUG) ports::PortStatus port_status; - node_controller_->node()->GetStatus(port_, &port_status); - if (port_status.has_messages) { - ports::ScopedMessage unused; - size_t message_size = 0; - node_controller_->node()->GetMessageIf( - port_, [&message_size](const ports::Message& message) { - message_size = message.num_payload_bytes(); - return false; - }, &unused); - DVLOG(1) << "New message detected on message pipe " << pipe_id_ - << " endpoint " << endpoint_ << " [port=" << port_.name() - << "; size=" << message_size << "]"; - } - if (port_status.peer_closed) { - DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ - << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; + if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { + if (port_status.has_messages) { + ports::ScopedMessage unused; + size_t message_size = 0; + node_controller_->node()->GetMessageIf( + port_, [&message_size](const ports::Message& message) { + message_size = message.num_payload_bytes(); + return false; + }, &unused); + DVLOG(1) << "New message detected on message pipe " << pipe_id_ + << " endpoint " << endpoint_ << " [port=" << port_.name() + << "; size=" << message_size << "]"; + } + if (port_status.peer_closed) { + DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ + << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; + } } #endif diff --git a/chromium/mojo/edk/system/message_pipe_dispatcher.h b/chromium/mojo/edk/system/message_pipe_dispatcher.h index b457ab99f63..fddd0fd8cf8 100644 --- a/chromium/mojo/edk/system/message_pipe_dispatcher.h +++ b/chromium/mojo/edk/system/message_pipe_dispatcher.h @@ -7,12 +7,14 @@ #include <stdint.h> +#include <memory> #include <queue> #include "base/macros.h" -#include "base/memory/scoped_ptr.h" +#include "mojo/edk/system/atomic_flag.h" #include "mojo/edk/system/awakable_list.h" #include "mojo/edk/system/dispatcher.h" +#include "mojo/edk/system/message_for_transit.h" #include "mojo/edk/system/ports/port_ref.h" namespace mojo { @@ -51,16 +53,14 @@ class MessagePipeDispatcher : public Dispatcher { const Watcher::WatchCallback& callback, uintptr_t context) override; MojoResult CancelWatch(uintptr_t context) override; - MojoResult WriteMessage(const void* bytes, - uint32_t num_bytes, - const DispatcherInTransit* dispatchers, - uint32_t num_dispatchers, + MojoResult WriteMessage(std::unique_ptr<MessageForTransit> message, MojoWriteMessageFlags flags) override; - MojoResult ReadMessage(void* bytes, + MojoResult ReadMessage(std::unique_ptr<MessageForTransit>* message, uint32_t* num_bytes, MojoHandle* handles, uint32_t* num_handles, - MojoReadMessageFlags flags) override; + MojoReadMessageFlags flags, + bool read_any_size) override; HandleSignalsState GetHandleSignalsState() const override; MojoResult AddAwakable(Awakable* awakable, MojoHandleSignals signals, @@ -107,10 +107,10 @@ class MessagePipeDispatcher : public Dispatcher { // This is not the same is |port_transferred_|. It's only held true between // BeginTransit() and Complete/CancelTransit(). - bool in_transit_ = false; + AtomicFlag in_transit_; bool port_transferred_ = false; - bool port_closed_ = false; + AtomicFlag port_closed_; AwakableList awakables_; DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher); diff --git a/chromium/mojo/edk/system/message_pipe_perftest.cc b/chromium/mojo/edk/system/message_pipe_perftest.cc index 569ee1f2127..a6ce3709e53 100644 --- a/chromium/mojo/edk/system/message_pipe_perftest.cc +++ b/chromium/mojo/edk/system/message_pipe_perftest.cc @@ -5,12 +5,16 @@ #include <stddef.h> #include <stdint.h> +#include <memory> #include <utility> #include "base/bind.h" +#include "base/bind_helpers.h" #include "base/logging.h" +#include "base/macros.h" #include "base/strings/stringprintf.h" #include "base/test/perf_time_logger.h" +#include "base/threading/thread.h" #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/embedder/scoped_platform_handle.h" #include "mojo/edk/system/handle_signals_state.h" @@ -25,11 +29,9 @@ namespace mojo { namespace edk { namespace { -class MultiprocessMessagePipePerfTest : public test::MojoTestBase { +class MessagePipePerfTest : public test::MojoTestBase { public: - MultiprocessMessagePipePerfTest() - : message_count_(0), - message_size_(0) {} + MessagePipePerfTest() : message_count_(0), message_size_(0) {} void SetUpMeasurement(int message_count, size_t message_size) { message_count_ = message_count; @@ -76,73 +78,90 @@ class MultiprocessMessagePipePerfTest : public test::MojoTestBase { logger.Done(); } + protected: + void RunPingPongServer(MojoHandle mp) { + // This values are set to align with one at ipc_pertests.cc for comparison. + const size_t kMsgSize[5] = {12, 144, 1728, 20736, 248832}; + const int kMessageCount[5] = {50000, 50000, 50000, 12000, 1000}; + + for (size_t i = 0; i < 5; i++) { + SetUpMeasurement(kMessageCount[i], kMsgSize[i]); + Measure(mp); + } + + SendQuitMessage(mp); + } + + static int RunPingPongClient(MojoHandle mp) { + std::string buffer(1000000, '\0'); + int rv = 0; + while (true) { + // Wait for our end of the message pipe to be readable. + HandleSignalsState hss; + MojoResult result = + MojoWait(mp, MOJO_HANDLE_SIGNAL_READABLE, + MOJO_DEADLINE_INDEFINITE, &hss); + if (result != MOJO_RESULT_OK) { + rv = result; + break; + } + + uint32_t read_size = static_cast<uint32_t>(buffer.size()); + CHECK_EQ(MojoReadMessage(mp, &buffer[0], + &read_size, nullptr, + 0, MOJO_READ_MESSAGE_FLAG_NONE), + MOJO_RESULT_OK); + + // Empty message indicates quit. + if (read_size == 0) + break; + + CHECK_EQ(MojoWriteMessage(mp, &buffer[0], + read_size, + nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE), + MOJO_RESULT_OK); + } + + return rv; + } + private: int message_count_; size_t message_size_; std::string payload_; std::string read_buffer_; - scoped_ptr<base::PerfTimeLogger> perf_logger_; + std::unique_ptr<base::PerfTimeLogger> perf_logger_; + + DISALLOW_COPY_AND_ASSIGN(MessagePipePerfTest); }; +TEST_F(MessagePipePerfTest, PingPong) { + MojoHandle server_handle, client_handle; + CreateMessagePipe(&server_handle, &client_handle); + + base::Thread client_thread("PingPongClient"); + client_thread.Start(); + client_thread.task_runner()->PostTask( + FROM_HERE, + base::Bind(base::IgnoreResult(&RunPingPongClient), client_handle)); + + RunPingPongServer(server_handle); +} + // For each message received, sends a reply message with the same contents // repeated twice, until the other end is closed or it receives "quitquitquit" // (which it doesn't reply to). It'll return the number of messages received, // not including any "quitquitquit" message, modulo 100. -DEFINE_TEST_CLIENT_WITH_PIPE(PingPongClient, MultiprocessMessagePipePerfTest, - h) { - std::string buffer(1000000, '\0'); - int rv = 0; - while (true) { - // Wait for our end of the message pipe to be readable. - HandleSignalsState hss; - MojoResult result = - MojoWait(h, MOJO_HANDLE_SIGNAL_READABLE, - MOJO_DEADLINE_INDEFINITE, &hss); - if (result != MOJO_RESULT_OK) { - rv = result; - break; - } - - uint32_t read_size = static_cast<uint32_t>(buffer.size()); - CHECK_EQ(MojoReadMessage(h, &buffer[0], - &read_size, nullptr, - 0, MOJO_READ_MESSAGE_FLAG_NONE), - MOJO_RESULT_OK); - - // Empty message indicates quit. - if (read_size == 0) - break; - - CHECK_EQ(MojoWriteMessage(h, &buffer[0], - read_size, - nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE), - MOJO_RESULT_OK); - } - - return rv; +DEFINE_TEST_CLIENT_WITH_PIPE(PingPongClient, MessagePipePerfTest, h) { + return RunPingPongClient(h); } // Repeatedly sends messages as previous one got replied by the child. // Waits for the child to close its end before quitting once specified // number of messages has been sent. -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PingPong DISABLED_PingPong -#else -#define MAYBE_PingPong PingPong -#endif // defined(OS_ANDROID) -TEST_F(MultiprocessMessagePipePerfTest, MAYBE_PingPong) { +TEST_F(MessagePipePerfTest, MultiprocessPingPong) { RUN_CHILD_ON_PIPE(PingPongClient, h) - // This values are set to align with one at ipc_pertests.cc for comparison. - const size_t kMsgSize[5] = {12, 144, 1728, 20736, 248832}; - const int kMessageCount[5] = {50000, 50000, 50000, 12000, 1000}; - - for (size_t i = 0; i < 5; i++) { - SetUpMeasurement(kMessageCount[i], kMsgSize[i]); - Measure(h); - } - - SendQuitMessage(h); + RunPingPongServer(h); END_CHILD() } diff --git a/chromium/mojo/edk/system/message_pipe_unittest.cc b/chromium/mojo/edk/system/message_pipe_unittest.cc index bfafbb740d1..fcfaeca1790 100644 --- a/chromium/mojo/edk/system/message_pipe_unittest.cc +++ b/chromium/mojo/edk/system/message_pipe_unittest.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. #include <stdint.h> +#include <string.h> #include "base/memory/ref_counted.h" #include "mojo/edk/system/test_utils.h" @@ -409,6 +410,72 @@ TEST_F(MessagePipeTest, BasicWaiting) { ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); } +TEST_F(MessagePipeTest, InvalidMessageObjects) { + // null message + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + MojoFreeMessage(MOJO_MESSAGE_HANDLE_INVALID)); + + // null message + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + MojoGetMessageBuffer(MOJO_MESSAGE_HANDLE_INVALID, nullptr)); + + // Non-zero num_handles with null handles array. + ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, + MojoAllocMessage(0, nullptr, 1, MOJO_ALLOC_MESSAGE_FLAG_NONE, + nullptr)); +} + +TEST_F(MessagePipeTest, AllocAndFreeMessage) { + const std::string kMessage = "Hello, world."; + MojoMessageHandle message = MOJO_MESSAGE_HANDLE_INVALID; + ASSERT_EQ(MOJO_RESULT_OK, + MojoAllocMessage(static_cast<uint32_t>(kMessage.size()), nullptr, 0, + MOJO_ALLOC_MESSAGE_FLAG_NONE, &message)); + ASSERT_NE(MOJO_MESSAGE_HANDLE_INVALID, message); + ASSERT_EQ(MOJO_RESULT_OK, MojoFreeMessage(message)); +} + +TEST_F(MessagePipeTest, WriteAndReadMessageObject) { + const std::string kMessage = "Hello, world."; + MojoMessageHandle message = MOJO_MESSAGE_HANDLE_INVALID; + EXPECT_EQ(MOJO_RESULT_OK, + MojoAllocMessage(static_cast<uint32_t>(kMessage.size()), nullptr, 0, + MOJO_ALLOC_MESSAGE_FLAG_NONE, &message)); + ASSERT_NE(MOJO_MESSAGE_HANDLE_INVALID, message); + + void* buffer = nullptr; + EXPECT_EQ(MOJO_RESULT_OK, MojoGetMessageBuffer(message, &buffer)); + ASSERT_TRUE(buffer); + memcpy(buffer, kMessage.data(), kMessage.size()); + + MojoHandle a, b; + CreateMessagePipe(&a, &b); + EXPECT_EQ(MOJO_RESULT_OK, + MojoWriteMessageNew(a, message, MOJO_WRITE_MESSAGE_FLAG_NONE)); + + EXPECT_EQ(MOJO_RESULT_OK, + MojoWait(b, MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE, + nullptr)); + uint32_t num_bytes = 0; + uint32_t num_handles = 0; + EXPECT_EQ(MOJO_RESULT_OK, + MojoReadMessageNew(b, &message, &num_bytes, nullptr, &num_handles, + MOJO_READ_MESSAGE_FLAG_NONE)); + ASSERT_NE(MOJO_MESSAGE_HANDLE_INVALID, message); + EXPECT_EQ(static_cast<uint32_t>(kMessage.size()), num_bytes); + EXPECT_EQ(0u, num_handles); + + EXPECT_EQ(MOJO_RESULT_OK, MojoGetMessageBuffer(message, &buffer)); + ASSERT_TRUE(buffer); + + EXPECT_EQ(0, strncmp(static_cast<const char*>(buffer), kMessage.data(), + num_bytes)); + + EXPECT_EQ(MOJO_RESULT_OK, MojoFreeMessage(message)); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(a)); + EXPECT_EQ(MOJO_RESULT_OK, MojoClose(b)); +} + #if !defined(OS_IOS) const size_t kPingPongHandlesPerIteration = 50; @@ -467,13 +534,7 @@ TEST_F(MessagePipeTest, DISABLED_DataPipeProducerHandlePingPong) { MojoClose(p[i]); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_SharedBufferHandlePingPong DISABLED_SharedBufferHandlePingPong -#else -#define MAYBE_SharedBufferHandlePingPong SharedBufferHandlePingPong -#endif -TEST_F(MessagePipeTest, MAYBE_SharedBufferHandlePingPong) { +TEST_F(MessagePipeTest, SharedBufferHandlePingPong) { MojoHandle buffers[kPingPongHandlesPerIteration]; for (size_t i = 0; i <kPingPongHandlesPerIteration; ++i) EXPECT_EQ(MOJO_RESULT_OK, MojoCreateSharedBuffer(nullptr, 1, &buffers[i])); diff --git a/chromium/mojo/edk/system/multiprocess_message_pipe_unittest.cc b/chromium/mojo/edk/system/multiprocess_message_pipe_unittest.cc index 1d07d0f650a..946322c9b97 100644 --- a/chromium/mojo/edk/system/multiprocess_message_pipe_unittest.cc +++ b/chromium/mojo/edk/system/multiprocess_message_pipe_unittest.cc @@ -115,14 +115,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(EchoEcho, MultiprocessMessagePipeTest, h) { return rv; } -// Sends "hello" to child, and expects "hellohello" back. -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_Basic DISABLED_Basic -#else -#define MAYBE_Basic Basic -#endif // defined(OS_ANDROID) -TEST_F(MultiprocessMessagePipeTest, MAYBE_Basic) { +TEST_F(MultiprocessMessagePipeTest, Basic) { RUN_CHILD_ON_PIPE(EchoEcho, h) std::string hello("hello"); ASSERT_EQ(MOJO_RESULT_OK, @@ -158,15 +151,7 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_Basic) { END_CHILD_AND_EXPECT_EXIT_CODE(1 % 100); } -// Sends a bunch of messages to the child. Expects them "repeated" back. Waits -// for the child to close its end before quitting. -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_QueueMessages DISABLED_QueueMessages -#else -#define MAYBE_QueueMessages QueueMessages -#endif // defined(OS_ANDROID) -TEST_F(MultiprocessMessagePipeTest, MAYBE_QueueMessages) { +TEST_F(MultiprocessMessagePipeTest, QueueMessages) { static const size_t kNumMessages = 1001; RUN_CHILD_ON_PIPE(EchoEcho, h) for (size_t i = 0; i < kNumMessages; i++) { @@ -292,13 +277,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(CheckSharedBuffer, MultiprocessMessagePipeTest, return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing -#else -#define MAYBE_SharedBufferPassing SharedBufferPassing -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_SharedBufferPassing) { +TEST_F(MultiprocessMessagePipeTest, SharedBufferPassing) { RUN_CHILD_ON_PIPE(CheckSharedBuffer, h) // Make a shared buffer. MojoCreateSharedBufferOptions options; @@ -529,13 +508,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(CheckMessagePipe, MultiprocessMessagePipeTest, h) { return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_MessagePipePassing DISABLED_MessagePipePassing -#else -#define MAYBE_MessagePipePassing MessagePipePassing -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_MessagePipePassing) { +TEST_F(MultiprocessMessagePipeTest, MessagePipePassing) { RUN_CHILD_ON_PIPE(CheckMessagePipe, h) MojoCreateSharedBufferOptions options; options.struct_size = sizeof(options); @@ -577,14 +550,7 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_MessagePipePassing) { END_CHILD() } -// Like above test, but verifies passing the other MP handle works as well. -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_MessagePipeTwoPassing DISABLED_MessagePipeTwoPassing -#else -#define MAYBE_MessagePipeTwoPassing MessagePipeTwoPassing -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_MessagePipeTwoPassing) { +TEST_F(MultiprocessMessagePipeTest, MessagePipeTwoPassing) { RUN_CHILD_ON_PIPE(CheckMessagePipe, h) MojoHandle mp1, mp2; ASSERT_EQ(MOJO_RESULT_OK, @@ -672,13 +638,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(DataPipeConsumer, MultiprocessMessagePipeTest, h) { return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_DataPipeConsumer DISABLED_DataPipeConsumer -#else -#define MAYBE_DataPipeConsumer DataPipeConsumer -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_DataPipeConsumer) { +TEST_F(MultiprocessMessagePipeTest, DataPipeConsumer) { RUN_CHILD_ON_PIPE(DataPipeConsumer, h) MojoCreateSharedBufferOptions options; options.struct_size = sizeof(options); @@ -772,13 +732,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(ChannelEchoClient, MultiprocessMessagePipeTest, return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_MultiprocessChannelPipe DISABLED_MultiprocessChannelPipe -#else -#define MAYBE_MultiprocessChannelPipe MultiprocessChannelPipe -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_MultiprocessChannelPipe) { +TEST_F(MultiprocessMessagePipeTest, MultiprocessChannelPipe) { RUN_CHILD_ON_PIPE(ChannelEchoClient, h) VerifyEcho(h, "in an interstellar burst"); VerifyEcho(h, "i am back to save the universe"); @@ -803,13 +757,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(EchoServiceClient, MultiprocessMessagePipeTest, return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PassMessagePipeCrossProcess DISABLED_PassMessagePipeCrossProcess -#else -#define MAYBE_PassMessagePipeCrossProcess PassMessagePipeCrossProcess -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_PassMessagePipeCrossProcess) { +TEST_F(MultiprocessMessagePipeTest, PassMessagePipeCrossProcess) { MojoHandle p0, p1; CreateMessagePipe(&p0, &p1); RUN_CHILD_ON_PIPE(EchoServiceClient, h) @@ -866,14 +814,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(EchoServiceFactoryClient, return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PassMoarMessagePipesCrossProcess \ - DISABLED_PassMoarMessagePipesCrossProcess -#else -#define MAYBE_PassMoarMessagePipesCrossProcess PassMoarMessagePipesCrossProcess -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_PassMoarMessagePipesCrossProcess) { +TEST_F(MultiprocessMessagePipeTest, PassMoarMessagePipesCrossProcess) { MojoHandle echo_factory_proxy, echo_factory_request; CreateMessagePipe(&echo_factory_proxy, &echo_factory_request); @@ -918,14 +859,7 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_PassMoarMessagePipesCrossProcess) { CloseHandle(echo_proxy_c); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_ChannelPipesWithMultipleChildren \ - DISABLED_ChannelPipesWithMultipleChildren -#else -#define MAYBE_ChannelPipesWithMultipleChildren ChannelPipesWithMultipleChildren -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_ChannelPipesWithMultipleChildren) { +TEST_F(MultiprocessMessagePipeTest, ChannelPipesWithMultipleChildren) { RUN_CHILD_ON_PIPE(ChannelEchoClient, a) RUN_CHILD_ON_PIPE(ChannelEchoClient, b) VerifyEcho(a, "hello child 0"); @@ -955,13 +889,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(PingPongPipeClient, EXPECT_EQ("quit", ReadMessage(h)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PingPongPipe DISABLED_PingPongPipe -#else -#define MAYBE_PingPongPipe PingPongPipe -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_PingPongPipe) { +TEST_F(MultiprocessMessagePipeTest, PingPongPipe) { MojoHandle p0, p1; CreateMessagePipe(&p0, &p1); @@ -1060,13 +988,7 @@ DEFINE_TEST_CLIENT_WITH_PIPE(CommandDrivenClient, MultiprocessMessagePipeTest, return 0; } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_ChildToChildPipes DISABLED_ChildToChildPipes -#else -#define MAYBE_ChildToChildPipes ChildToChildPipes -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_ChildToChildPipes) { +TEST_F(MultiprocessMessagePipeTest, ChildToChildPipes) { RUN_CHILD_ON_PIPE(CommandDrivenClient, h0) RUN_CHILD_ON_PIPE(CommandDrivenClient, h1) CommandDrivenClientController a(h0); @@ -1091,13 +1013,7 @@ TEST_F(MultiprocessMessagePipeTest, MAYBE_ChildToChildPipes) { END_CHILD() } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_MoreChildToChildPipes DISABLED_MoreChildToChildPipes -#else -#define MAYBE_MoreChildToChildPipes MoreChildToChildPipes -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_MoreChildToChildPipes) { +TEST_F(MultiprocessMessagePipeTest, MoreChildToChildPipes) { RUN_CHILD_ON_PIPE(CommandDrivenClient, h0) RUN_CHILD_ON_PIPE(CommandDrivenClient, h1) RUN_CHILD_ON_PIPE(CommandDrivenClient, h2) @@ -1183,13 +1099,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReceivePipeWithClosedPeer, MOJO_DEADLINE_INDEFINITE, nullptr)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_SendPipeThenClosePeer DISABLED_SendPipeThenClosePeer -#else -#define MAYBE_SendPipeThenClosePeer SendPipeThenClosePeer -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_SendPipeThenClosePeer) { +TEST_F(MultiprocessMessagePipeTest, SendPipeThenClosePeer) { RUN_CHILD_ON_PIPE(ReceivePipeWithClosedPeer, h) MojoHandle a, b; CreateMessagePipe(&a, &b); @@ -1266,13 +1176,7 @@ TEST_F(MultiprocessMessagePipeTest, } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_SendClosePeerSend DISABLED_SendClosePeerSend -#else -#define MAYBE_SendClosePeerSend SendClosePeerSend -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_SendClosePeerSend) { +TEST_F(MultiprocessMessagePipeTest, SendClosePeerSend) { MojoHandle a, b; CreateMessagePipe(&a, &b); @@ -1315,13 +1219,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteCloseSendPeerClient, EXPECT_EQ("quit", ReadMessage(h)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_WriteCloseSendPeer DISABLED_WriteCloseSendPeer -#else -#define MAYBE_WriteCloseSendPeer WriteCloseSendPeer -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_WriteCloseSendPeer) { +TEST_F(MultiprocessMessagePipeTest, WriteCloseSendPeer) { MojoHandle pipe[2]; CreateMessagePipe(&pipe[0], &pipe[1]); @@ -1361,13 +1259,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(BootstrapMessagePipeAsyncClient, VerifyEcho(pipe.get().value(), "goodbye"); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_BootstrapMessagePipeAsync DISABLED_BootstrapMessagePipeAsync -#else -#define MAYBE_BootstrapMessagePipeAsync BootstrapMessagePipeAsync -#endif -TEST_F(MultiprocessMessagePipeTest, MAYBE_BootstrapMessagePipeAsync) { +TEST_F(MultiprocessMessagePipeTest, BootstrapMessagePipeAsync) { // Tests that new cross-process message pipes can be created synchronously // using asynchronous negotiation over an arbitrary platform channel. RUN_CHILD_ON_PIPE(BootstrapMessagePipeAsyncClient, child) diff --git a/chromium/mojo/edk/system/node_channel.cc b/chromium/mojo/edk/system/node_channel.cc index bde3fba91de..7a0fbb9373c 100644 --- a/chromium/mojo/edk/system/node_channel.cc +++ b/chromium/mojo/edk/system/node_channel.cc @@ -194,6 +194,7 @@ void NodeChannel::ShutDown() { void NodeChannel::SetRemoteProcessHandle(base::ProcessHandle process_handle) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); base::AutoLock lock(remote_process_handle_lock_); + CHECK_NE(remote_process_handle_, base::GetCurrentProcessHandle()); remote_process_handle_ = process_handle; } @@ -406,11 +407,20 @@ void NodeChannel::OnChannelMessage(const void* payload, { base::AutoLock lock(remote_process_handle_lock_); if (handles && remote_process_handle_ != base::kNullProcessHandle) { + // Note that we explicitly mark the handles as being owned by the sending + // process before rewriting them, in order to accommodate RewriteHandles' + // internal sanity checks. + for (auto& handle : *handles) + handle.owning_process = remote_process_handle_; if (!Channel::Message::RewriteHandles(remote_process_handle_, base::GetCurrentProcessHandle(), handles->data(), handles->size())) { DLOG(ERROR) << "Received one or more invalid handles."; } + } else if (handles) { + // Handles received by an unknown process must already be owned by us. + for (auto& handle : *handles) + handle.owning_process = base::GetCurrentProcessHandle(); } } #elif defined(OS_MACOSX) && !defined(OS_IOS) diff --git a/chromium/mojo/edk/system/node_controller.cc b/chromium/mojo/edk/system/node_controller.cc index 40c57eeac58..822165a9975 100644 --- a/chromium/mojo/edk/system/node_controller.cc +++ b/chromium/mojo/edk/system/node_controller.cc @@ -14,6 +14,7 @@ #include "base/message_loop/message_loop.h" #include "base/metrics/histogram_macros.h" #include "base/process/process_handle.h" +#include "base/time/time.h" #include "base/timer/elapsed_timer.h" #include "crypto/random.h" #include "mojo/edk/embedder/embedder_internal.h" @@ -173,13 +174,9 @@ void NodeController::ClosePort(const ports::PortRef& port) { } int NodeController::SendMessage(const ports::PortRef& port, - scoped_ptr<PortsMessage>* message) { - ports::ScopedMessage ports_message(message->release()); - int rv = node_->SendMessage(port, &ports_message); - if (rv != ports::OK) { - DCHECK(ports_message); - message->reset(static_cast<PortsMessage*>(ports_message.release())); - } + std::unique_ptr<PortsMessage> message) { + ports::ScopedMessage ports_message(message.release()); + int rv = node_->SendMessage(port, std::move(ports_message)); AcceptIncomingMessages(); return rv; @@ -193,10 +190,19 @@ void NodeController::ReservePort(const std::string& token, base::AutoLock lock(reserved_ports_lock_); auto result = reserved_ports_.insert(std::make_pair(token, port)); DCHECK(result.second); + + // Safeguard against unpredictable and exceptional cases where a reservation + // holder may disappear without ever claiming their reservation. + io_task_runner_->PostDelayedTask( + FROM_HERE, + base::Bind(&NodeController::CancelReservation, + base::Unretained(this), token), + base::TimeDelta::FromMinutes(1)); } void NodeController::MergePortIntoParent(const std::string& token, const ports::PortRef& port) { + bool was_merged = false; { // This request may be coming from within the process that reserved the // "parent" side (e.g. for Chrome single-process mode), so if this token is @@ -206,9 +212,13 @@ void NodeController::MergePortIntoParent(const std::string& token, if (it != reserved_ports_.end()) { node_->MergePorts(port, name_, it->second.name()); reserved_ports_.erase(it); - return; + was_merged = true; } } + if (was_merged) { + AcceptIncomingMessages(); + return; + } scoped_refptr<NodeChannel> parent; { @@ -251,6 +261,7 @@ void NodeController::RequestShutdown(const base::Closure& callback) { { base::AutoLock lock(shutdown_lock_); shutdown_callback_ = callback; + shutdown_callback_flag_.Set(true); } AttemptShutdownIfRequested(); @@ -475,7 +486,7 @@ void NodeController::SendPeerMessage(const ports::NodeName& name, } void NodeController::AcceptIncomingMessages() { - for (;;) { + while (incoming_messages_flag_) { // TODO: We may need to be more careful to avoid starving the rest of the // thread here. Revisit this if it turns out to be a problem. One // alternative would be to schedule a task to continue pumping messages @@ -490,6 +501,7 @@ void NodeController::AcceptIncomingMessages() { // the size is 0. So avoid creating it until it is necessary. std::queue<ports::ScopedMessage> messages; std::swap(messages, incoming_messages_); + incoming_messages_flag_.Set(false); messages_lock_.Release(); while (!messages.empty()) { @@ -535,6 +547,19 @@ void NodeController::DropAllPeers() { delete this; } +void NodeController::CancelReservation(const std::string& token) { + ports::PortRef reserved_port; + { + base::AutoLock lock(reserved_ports_lock_); + auto iter = reserved_ports_.find(token); + if (iter == reserved_ports_.end()) // Already claimed! + return; + reserved_port = iter->second; + reserved_ports_.erase(iter); + } + node_->ClosePort(reserved_port); +} + void NodeController::GenerateRandomPortName(ports::PortName* port_name) { GenerateRandomName(port_name); } @@ -553,6 +578,7 @@ void NodeController::ForwardMessage(const ports::NodeName& node, // AcceptMessage, we flush the queue after calling any of those methods. base::AutoLock lock(messages_lock_); incoming_messages_.emplace(std::move(message)); + incoming_messages_flag_.Set(true); } else { SendPeerMessage(node, std::move(message)); } @@ -875,6 +901,8 @@ void NodeController::OnIntroduce(const ports::NodeName& from_node, DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); if (!channel_handle.is_valid()) { + node_->LostConnectionToNode(name); + DLOG(ERROR) << "Could not be introduced to peer " << name; base::AutoLock lock(peers_lock_); pending_peer_messages_.erase(name); @@ -911,6 +939,12 @@ void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, // process before going out (see NodeChannel::WriteChannelMessage). // // TODO: We could avoid double-duplication. + // + // Note that we explicitly mark the handles as being owned by the sending + // process before rewriting them, in order to accommodate RewriteHandles' + // internal sanity checks. + for (size_t i = 0; i < message->num_handles(); ++i) + message->handles()[i].owning_process = from_process; if (!Channel::Message::RewriteHandles(from_process, base::GetCurrentProcessHandle(), message->handles(), @@ -978,17 +1012,22 @@ void NodeController::DestroyOnIOThreadShutdown() { } void NodeController::AttemptShutdownIfRequested() { + if (!shutdown_callback_flag_) + return; + base::Closure callback; { base::AutoLock lock(shutdown_lock_); if (shutdown_callback_.is_null()) return; if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { - DVLOG(2) << "Unable to cleanly shut down node " << name_ << "."; + DVLOG(2) << "Unable to cleanly shut down node " << name_; return; } + callback = shutdown_callback_; shutdown_callback_.Reset(); + shutdown_callback_flag_.Set(false); } DCHECK(!callback.is_null()); diff --git a/chromium/mojo/edk/system/node_controller.h b/chromium/mojo/edk/system/node_controller.h index 463a72926f9..e2207ca3913 100644 --- a/chromium/mojo/edk/system/node_controller.h +++ b/chromium/mojo/edk/system/node_controller.h @@ -5,6 +5,7 @@ #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ +#include <memory> #include <queue> #include <unordered_map> #include <unordered_set> @@ -14,13 +15,12 @@ #include "base/containers/hash_tables.h" #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/task_runner.h" #include "mojo/edk/embedder/platform_handle_vector.h" #include "mojo/edk/embedder/platform_shared_buffer.h" #include "mojo/edk/embedder/scoped_platform_handle.h" +#include "mojo/edk/system/atomic_flag.h" #include "mojo/edk/system/node_channel.h" -#include "mojo/edk/system/ports/hash_functions.h" #include "mojo/edk/system/ports/name.h" #include "mojo/edk/system/ports/node.h" #include "mojo/edk/system/ports/node_delegate.h" @@ -87,10 +87,9 @@ class NodeController : public ports::NodeDelegate, // it ensures the port's observer has also been removed. void ClosePort(const ports::PortRef& port); - // Sends a message on a port to its peer. If message send fails, |message| - // is left intact. Otherwise ownership is transferred and it's reset. + // Sends a message on a port to its peer. int SendMessage(const ports::PortRef& port_ref, - scoped_ptr<PortsMessage>* message); + std::unique_ptr<PortsMessage> message); // Reserves a local port |port| associated with |token|. A peer holding a copy // of |token| can merge one of its own ports into this one. @@ -138,6 +137,7 @@ class NodeController : public ports::NodeDelegate, ports::ScopedMessage message); void AcceptIncomingMessages(); void DropAllPeers(); + void CancelReservation(const std::string& token); // ports::NodeDelegate: void GenerateRandomPortName(ports::PortName* port_name) override; @@ -197,7 +197,7 @@ class NodeController : public ports::NodeDelegate, // These are safe to access from any thread as long as the Node is alive. Core* const core_; const ports::NodeName name_; - const scoped_ptr<ports::Node> node_; + const std::unique_ptr<ports::Node> node_; scoped_refptr<base::TaskRunner> io_task_runner_; // Guards |peers_| and |pending_peer_messages_|. @@ -248,6 +248,8 @@ class NodeController : public ports::NodeDelegate, // Guards |incoming_messages_|. base::Lock messages_lock_; std::queue<ports::ScopedMessage> incoming_messages_; + // Flag to fast-path checking |incoming_messages_|. + AtomicFlag incoming_messages_flag_; // Guards |shutdown_callback_|. base::Lock shutdown_lock_; @@ -256,6 +258,8 @@ class NodeController : public ports::NodeDelegate, // begin polling the Node to see if clean shutdown is possible any time the // Node's state is modified by the controller. base::Closure shutdown_callback_; + // Flag to fast-path checking |shutdown_callback_|. + AtomicFlag shutdown_callback_flag_; // All other fields below must only be accessed on the I/O thread, i.e., the // thread on which core_->io_task_runner() runs tasks. @@ -269,13 +273,13 @@ class NodeController : public ports::NodeDelegate, #if defined(OS_POSIX) && !defined(OS_MACOSX) // Broker for sync shared buffer creation (non-Mac posix-only) in children. - scoped_ptr<Broker> broker_; + std::unique_ptr<Broker> broker_; #endif #if defined(OS_MACOSX) && !defined(OS_IOS) base::Lock mach_port_relay_lock_; // Relay for transferring mach ports to/from children. - scoped_ptr<MachPortRelay> mach_port_relay_; + std::unique_ptr<MachPortRelay> mach_port_relay_; #endif DISALLOW_COPY_AND_ASSIGN(NodeController); diff --git a/chromium/mojo/edk/system/options_validation_unittest.cc b/chromium/mojo/edk/system/options_validation_unittest.cc index d2c81808dae..a01a92cfb10 100644 --- a/chromium/mojo/edk/system/options_validation_unittest.cc +++ b/chromium/mojo/edk/system/options_validation_unittest.cc @@ -95,7 +95,11 @@ TEST(OptionsValidationTest, Invalid) { // (for required pointer arguments) will still cause death, but perhaps not // predictably. TEST(OptionsValidationTest, InvalidDeath) { +#if defined(OFFICIAL_BUILD) + const char kMemoryCheckFailedRegex[] = ""; +#else const char kMemoryCheckFailedRegex[] = "Check failed"; +#endif // Null: EXPECT_DEATH_IF_SUPPORTED( diff --git a/chromium/mojo/edk/system/ports/BUILD.gn b/chromium/mojo/edk/system/ports/BUILD.gn index 173a17ba5e4..239b3a4400b 100644 --- a/chromium/mojo/edk/system/ports/BUILD.gn +++ b/chromium/mojo/edk/system/ports/BUILD.gn @@ -8,7 +8,6 @@ source_set("ports") { sources = [ "event.cc", "event.h", - "hash_functions.h", "message.cc", "message.h", "message_queue.cc", @@ -29,7 +28,9 @@ source_set("ports") { ] } -test("mojo_system_ports_unittests") { +source_set("tests") { + testonly = true + sources = [ "ports_unittest.cc", ] @@ -38,7 +39,6 @@ test("mojo_system_ports_unittests") { ":ports", "//base", "//base/test:test_support", - "//mojo/edk/test:run_all_unittests", "//testing/gtest", ] } diff --git a/chromium/mojo/edk/system/ports/hash_functions.h b/chromium/mojo/edk/system/ports/hash_functions.h deleted file mode 100644 index 3c91cb35929..00000000000 --- a/chromium/mojo/edk/system/ports/hash_functions.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2016 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef MOJO_EDK_SYSTEM_PORTS_HASH_FUNCTIONS_H_ -#define MOJO_EDK_SYSTEM_PORTS_HASH_FUNCTIONS_H_ - -#include <functional> - -#include "mojo/edk/system/ports/name.h" - -namespace std { - -template <> -struct hash<mojo::edk::ports::PortName> { - std::size_t operator()(const mojo::edk::ports::PortName& name) const { - size_t h1 = hash<uint64_t>()(name.v1); - size_t h2 = hash<uint64_t>()(name.v2); - return h1 ^ (h2 << 1); - } -}; - -template <> -struct hash<mojo::edk::ports::NodeName> { - std::size_t operator()(const mojo::edk::ports::NodeName& name) const { - size_t h1 = hash<uint64_t>()(name.v1); - size_t h2 = hash<uint64_t>()(name.v2); - return h1 ^ (h2 << 1); - } -}; - -} // namespace std - -#endif // MOJO_EDK_SYSTEM_PORTS_HASH_FUNCTIONS_H_ diff --git a/chromium/mojo/edk/system/ports/message.h b/chromium/mojo/edk/system/ports/message.h index 926ce752a84..95fa04676ce 100644 --- a/chromium/mojo/edk/system/ports/message.h +++ b/chromium/mojo/edk/system/ports/message.h @@ -7,7 +7,8 @@ #include <stddef.h> -#include "base/memory/scoped_ptr.h" +#include <memory> + #include "mojo/edk/system/ports/name.h" namespace mojo { @@ -83,7 +84,7 @@ class Message { size_t num_payload_bytes_ = 0; }; -using ScopedMessage = scoped_ptr<Message>; +using ScopedMessage = std::unique_ptr<Message>; } // namespace ports } // namespace edk diff --git a/chromium/mojo/edk/system/ports/name.h b/chromium/mojo/edk/system/ports/name.h index 8a9307d9cd3..1082719f6df 100644 --- a/chromium/mojo/edk/system/ports/name.h +++ b/chromium/mojo/edk/system/ports/name.h @@ -10,6 +10,8 @@ #include <ostream> #include <tuple> +#include "base/hash.h" + namespace mojo { namespace edk { namespace ports { @@ -51,4 +53,22 @@ const NodeName kInvalidNodeName = {0, 0}; } // namespace edk } // namespace mojo +namespace std { + +template <> +struct hash<mojo::edk::ports::PortName> { + std::size_t operator()(const mojo::edk::ports::PortName& name) const { + return base::HashInts64(name.v1, name.v2); + } +}; + +template <> +struct hash<mojo::edk::ports::NodeName> { + std::size_t operator()(const mojo::edk::ports::NodeName& name) const { + return base::HashInts64(name.v1, name.v2); + } +}; + +} // namespace std + #endif // MOJO_EDK_SYSTEM_PORTS_NAME_H_ diff --git a/chromium/mojo/edk/system/ports/node.cc b/chromium/mojo/edk/system/ports/node.cc index 266ee84b4f5..75e9857731d 100644 --- a/chromium/mojo/edk/system/ports/node.cc +++ b/chromium/mojo/edk/system/ports/node.cc @@ -6,6 +6,8 @@ #include <string.h> +#include <utility> + #include "base/logging.h" #include "base/memory/ref_counted.h" #include "base/synchronization/lock.h" @@ -28,6 +30,8 @@ bool CanAcceptMoreMessages(const Port* port) { // Have we already doled out the last message (i.e., do we expect to NOT // receive further messages)? uint64_t next_sequence_num = port->message_queue.next_sequence_num(); + if (port->state == Port::kClosed) + return false; if (port->peer_closed || port->remove_proxy_on_last_message) { if (port->last_sequence_num_to_receive == next_sequence_num - 1) return false; @@ -211,13 +215,13 @@ int Node::ClosePort(const PortRef& port_ref) { // If the port being closed still has unread messages, then we need to take // care to close those ports so as to avoid leaking memory. port->message_queue.GetReferencedPorts(&referenced_port_names); + + ErasePort_Locked(port_ref.name()); } DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_ << " to " << peer_port_name << "@" << peer_node_name; - ErasePort(port_ref.name()); - delegate_->ForwardMessage( peer_node_name, NewInternalMessage(peer_port_name, EventType::kObserveClosure, data)); @@ -269,7 +273,7 @@ int Node::GetMessageIf(const PortRef& port_ref, if (!CanAcceptMoreMessages(port)) return ERROR_PORT_PEER_CLOSED; - port->message_queue.GetNextMessageIf(selector, message); + port->message_queue.GetNextMessageIf(std::move(selector), message); } // Allow referenced ports to trigger PortStatusChanged calls. @@ -291,48 +295,22 @@ int Node::GetMessageIf(const PortRef& port_ref, return OK; } -int Node::SendMessage(const PortRef& port_ref, ScopedMessage* message) { - ScopedMessage& m = *message; - for (size_t i = 0; i < m->num_ports(); ++i) { - if (m->ports()[i] == port_ref.name()) - return ERROR_PORT_CANNOT_SEND_SELF; - } - - Port* port = port_ref.port(); - { - // We must acquire |ports_lock_| before grabbing any port locks, because - // WillSendMessage_Locked may need to lock multiple ports out of order. - base::AutoLock ports_lock(ports_lock_); - base::AutoLock lock(port->lock); - - if (port->state != Port::kReceiving) - return ERROR_PORT_STATE_UNEXPECTED; - - if (port->peer_closed) - return ERROR_PORT_PEER_CLOSED; - - int rv = WillSendMessage_Locked(port, port_ref.name(), m.get()); - if (rv != OK) - return rv; - - // Beyond this point there's no sense in returning anything but OK. Even if - // message forwarding or acceptance fails, there's nothing the embedder can - // do to recover. Assume that failure beyond this point must be treated as a - // transport failure. +int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) { + int rv = SendMessageInternal(port_ref, &message); + if (rv != OK) { + // If send failed, close all carried ports. Note that we're careful not to + // close the sending port itself if it happened to be one of the encoded + // ports (an invalid but possible condition.) + for (size_t i = 0; i < message->num_ports(); ++i) { + if (message->ports()[i] == port_ref.name()) + continue; - if (port->peer_node_name != name_) { - delegate_->ForwardMessage(port->peer_node_name, std::move(m)); - return OK; + PortRef port; + if (GetPort(message->ports()[i], &port) == OK) + ClosePort(port); } } - - int rv = AcceptMessage(std::move(m)); - if (rv != OK) { - // See comment above for why we don't return an error in this case. - DVLOG(2) << "AcceptMessage failed: " << rv; - } - - return OK; + return rv; } int Node::AcceptMessage(ScopedMessage message) { @@ -365,6 +343,7 @@ int Node::MergePorts(const PortRef& port_ref, const NodeName& destination_node_name, const PortName& destination_port_name) { Port* port = port_ref.port(); + MergePortEventData data; { // |ports_lock_| must be held for WillSendPort_Locked below. base::AutoLock ports_lock(ports_lock_); @@ -375,15 +354,14 @@ int Node::MergePorts(const PortRef& port_ref, // Send the port-to-merge over to the destination node so it can be merged // into the port cycle atomically there. - MergePortEventData data; data.new_port_name = port_ref.name(); WillSendPort_Locked(port, destination_node_name, &data.new_port_name, &data.new_port_descriptor); - delegate_->ForwardMessage( - destination_node_name, - NewInternalMessage(destination_port_name, - EventType::kMergePort, data)); } + delegate_->ForwardMessage( + destination_node_name, + NewInternalMessage(destination_port_name, + EventType::kMergePort, data)); return OK; } @@ -577,6 +555,17 @@ int Node::OnObserveProxy(const PortName& port_name, scoped_refptr<Port> port = GetPort(port_name); if (!port) { DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found"; + + if (port_name != event.proxy_port_name && + port_name != event.proxy_to_port_name) { + // The receiving port may have been removed while this message was in + // transit. In this case, we restart the ObserveProxy circulation from + // the referenced proxy port to avoid leaking the proxy. + delegate_->ForwardMessage( + event.proxy_node_name, + NewInternalMessage( + event.proxy_port_name, EventType::kObserveProxy, event)); + } return OK; } @@ -689,6 +678,9 @@ int Node::OnObserveClosure(const PortName& port_name, // ObserveProxyAck. bool notify_delegate = false; + ObserveClosureEventData forwarded_data; + NodeName peer_node_name; + PortName peer_port_name; { // We must acquire |ports_lock_| before the port lock because it must be // held for MaybeRemoveProxy_Locked. @@ -708,8 +700,6 @@ int Node::OnObserveClosure(const PortName& port_name, // cares about it. This ensures that any dead-end proxies beyond that port // are notified to remove themselves. - ObserveClosureEventData forwarded_data; - if (port->state == Port::kReceiving) { notify_delegate = true; @@ -739,11 +729,14 @@ int Node::OnObserveClosure(const PortName& port_name, << " (last_sequence_num=" << forwarded_data.last_sequence_num << ")"; - delegate_->ForwardMessage( - port->peer_node_name, - NewInternalMessage(port->peer_port_name, - EventType::kObserveClosure, forwarded_data)); + peer_node_name = port->peer_node_name; + peer_port_name = port->peer_port_name; } + delegate_->ForwardMessage( + peer_node_name, + NewInternalMessage(peer_port_name, EventType::kObserveClosure, + forwarded_data)); + if (notify_delegate) { PortRef port_ref(port_name, port); delegate_->PortStatusChanged(port_ref); @@ -754,8 +747,6 @@ int Node::OnObserveClosure(const PortName& port_name, int Node::OnMergePort(const PortName& port_name, const MergePortEventData& event) { scoped_refptr<Port> port = GetPort(port_name); - if (!port) - return ERROR_PORT_UNKNOWN; DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state=" << port->state << ") merging with proxy " << event.new_port_name @@ -773,7 +764,7 @@ int Node::OnMergePort(const PortName& port_name, int rv = AcceptPort(event.new_port_name, event.new_port_descriptor); if (rv != OK) { close_target_port = true; - } else { + } else if (port) { // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn // needs to hold |ports_lock_|. We also acquire multiple port locks within. base::AutoLock ports_lock(ports_lock_); @@ -798,6 +789,8 @@ int Node::OnMergePort(const PortName& port_name, close_new_port = true; close_target_port = true; } + } else { + close_new_port = true; } if (close_target_port) { @@ -830,11 +823,6 @@ int Node::AddPortWithName(const PortName& port_name, return OK; } -void Node::ErasePort(const PortName& port_name) { - base::AutoLock lock(ports_lock_); - return ErasePort_Locked(port_name); -} - void Node::ErasePort_Locked(const PortName& port_name) { ports_lock_.AssertAcquired(); ports_.erase(port_name); @@ -855,6 +843,53 @@ scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) { return iter->second; } +int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) { + ScopedMessage& m = *message; + for (size_t i = 0; i < m->num_ports(); ++i) { + if (m->ports()[i] == port_ref.name()) + return ERROR_PORT_CANNOT_SEND_SELF; + } + + Port* port = port_ref.port(); + NodeName peer_node_name; + { + // We must acquire |ports_lock_| before grabbing any port locks, because + // WillSendMessage_Locked may need to lock multiple ports out of order. + base::AutoLock ports_lock(ports_lock_); + base::AutoLock lock(port->lock); + + if (port->state != Port::kReceiving) + return ERROR_PORT_STATE_UNEXPECTED; + + if (port->peer_closed) + return ERROR_PORT_PEER_CLOSED; + + int rv = WillSendMessage_Locked(port, port_ref.name(), m.get()); + if (rv != OK) + return rv; + + // Beyond this point there's no sense in returning anything but OK. Even if + // message forwarding or acceptance fails, there's nothing the embedder can + // do to recover. Assume that failure beyond this point must be treated as a + // transport failure. + + peer_node_name = port->peer_node_name; + } + + if (peer_node_name != name_) { + delegate_->ForwardMessage(peer_node_name, std::move(m)); + return OK; + } + + int rv = AcceptMessage(std::move(m)); + if (rv != OK) { + // See comment above for why we don't return an error in this case. + DVLOG(2) << "AcceptMessage failed: " << rv; + } + + return OK; +} + int Node::MergePorts_Locked(const PortRef& port0_ref, const PortRef& port1_ref) { Port* port0 = port0_ref.port(); @@ -884,7 +919,6 @@ int Node::MergePorts_Locked(const PortRef& port0_ref, std::swap(port0->peer_node_name, port1->peer_node_name); std::swap(port0->peer_port_name, port1->peer_port_name); - std::swap(port0->peer_closed, port1->peer_closed); port0->state = Port::kBuffering; if (port0->peer_closed) @@ -926,7 +960,6 @@ int Node::MergePorts_Locked(const PortRef& port0_ref, // state by undoing the peer swap. std::swap(port0->peer_node_name, port1->peer_node_name); std::swap(port0->peer_port_name, port1->peer_port_name); - std::swap(port0->peer_closed, port1->peer_closed); port0->remove_proxy_on_last_message = false; port1->remove_proxy_on_last_message = false; port0->state = Port::kReceiving; diff --git a/chromium/mojo/edk/system/ports/node.h b/chromium/mojo/edk/system/ports/node.h index 2dc513c0c5f..e06942aff6f 100644 --- a/chromium/mojo/edk/system/ports/node.h +++ b/chromium/mojo/edk/system/ports/node.h @@ -15,7 +15,6 @@ #include "base/memory/ref_counted.h" #include "base/synchronization/lock.h" #include "mojo/edk/system/ports/event.h" -#include "mojo/edk/system/ports/hash_functions.h" #include "mojo/edk/system/ports/message.h" #include "mojo/edk/system/ports/name.h" #include "mojo/edk/system/ports/port.h" @@ -114,10 +113,7 @@ class Node { // Sends a message from the specified port to its peer. Note that the message // notification may arrive synchronously (via PortStatusChanged() on the // delegate) if the peer is local to this Node. - // - // If send fails for any reason, |message| is left unchanged. On success, - // ownserhip is transferred and |message| is reset. - int SendMessage(const PortRef& port_ref, ScopedMessage* message); + int SendMessage(const PortRef& port_ref, ScopedMessage message); // Corresponding to NodeDelegate::ForwardMessage. int AcceptMessage(ScopedMessage message); @@ -158,11 +154,11 @@ class Node { int AddPortWithName(const PortName& port_name, const scoped_refptr<Port>& port); - void ErasePort(const PortName& port_name); void ErasePort_Locked(const PortName& port_name); scoped_refptr<Port> GetPort(const PortName& port_name); scoped_refptr<Port> GetPort_Locked(const PortName& port_name); + int SendMessageInternal(const PortRef& port_ref, ScopedMessage* message); int MergePorts_Locked(const PortRef& port0_ref, const PortRef& port1_ref); void WillSendPort_Locked(Port* port, const NodeName& to_node_name, diff --git a/chromium/mojo/edk/system/ports/port.h b/chromium/mojo/edk/system/ports/port.h index 7e28e1294f8..ea53d43b5f0 100644 --- a/chromium/mojo/edk/system/ports/port.h +++ b/chromium/mojo/edk/system/ports/port.h @@ -5,13 +5,13 @@ #ifndef MOJO_EDK_SYSTEM_PORTS_PORT_H_ #define MOJO_EDK_SYSTEM_PORTS_PORT_H_ +#include <memory> #include <queue> #include <utility> #include <vector> #include "base/macros.h" #include "base/memory/ref_counted.h" -#include "base/memory/scoped_ptr.h" #include "base/synchronization/lock.h" #include "mojo/edk/system/ports/message_queue.h" #include "mojo/edk/system/ports/user_data.h" @@ -37,7 +37,7 @@ class Port : public base::RefCountedThreadSafe<Port> { uint64_t next_sequence_num_to_send; uint64_t last_sequence_num_to_receive; MessageQueue message_queue; - scoped_ptr<std::pair<NodeName, ScopedMessage>> send_on_proxy_removal; + std::unique_ptr<std::pair<NodeName, ScopedMessage>> send_on_proxy_removal; scoped_refptr<UserData> user_data; bool remove_proxy_on_last_message; bool peer_closed; diff --git a/chromium/mojo/edk/system/ports/ports_unittest.cc b/chromium/mojo/edk/system/ports/ports_unittest.cc index 1bdca3ff8cb..1098e8e2942 100644 --- a/chromium/mojo/edk/system/ports/ports_unittest.cc +++ b/chromium/mojo/edk/system/ports/ports_unittest.cc @@ -11,6 +11,7 @@ #include <sstream> #include "base/logging.h" +#include "base/rand_util.h" #include "mojo/edk/system/ports/node.h" #include "mojo/edk/system/ports/node_delegate.h" #include "testing/gtest/include/gtest/gtest.h" @@ -73,12 +74,12 @@ struct Task { Task(NodeName node_name, ScopedMessage message) : node_name(node_name), message(std::move(message)), - priority(rand()) { + priority(base::RandUint64()) { } NodeName node_name; ScopedMessage message; - int32_t priority; + uint64_t priority; }; struct TaskComparator { @@ -122,7 +123,7 @@ int SendStringMessage(Node* node, const PortRef& port, const std::string& s) { size_t size = s.size() + 1; ScopedMessage message = TestMessage::NewUserMessage(size, 0); memcpy(message->mutable_payload_bytes(), s.data(), size); - return node->SendMessage(port, &message); + return node->SendMessage(port, std::move(message)); } int SendStringMessageWithPort(Node* node, @@ -133,7 +134,7 @@ int SendStringMessageWithPort(Node* node, ScopedMessage message = TestMessage::NewUserMessage(size, 1); memcpy(message->mutable_payload_bytes(), s.data(), size); message->mutable_ports()[0] = sent_port_name; - return node->SendMessage(port ,&message); + return node->SendMessage(port, std::move(message)); } int SendStringMessageWithPort(Node* node, @@ -719,31 +720,16 @@ TEST_F(PortsTest, SendFailure) { EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, SendStringMessageWithPort(&node0, A, "nope", B)); + // B should be closed immediately. + EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); + PumpTasks(); // There should have been no messages accepted. ScopedMessage message; EXPECT_FALSE(node0_delegate.GetSavedMessage(&message)); - // Both A and B should still work. - - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hi")); - EXPECT_EQ(OK, SendStringMessage(&node0, B, "hey")); - - PumpTasks(); - - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hi", ToString(message))); - ClosePortsInMessage(&node0, message.get()); - - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); - EXPECT_EQ(0, strcmp("hey", ToString(message))); - ClosePortsInMessage(&node0, message.get()); - - PumpTasks(); - EXPECT_EQ(OK, node0.ClosePort(A)); - EXPECT_EQ(OK, node0.ClosePort(B)); PumpTasks(); @@ -1304,7 +1290,6 @@ TEST_F(PortsTest, MergePortsWithMovedPeers) { EXPECT_TRUE(node1.CanShutdownCleanly(false)); } - TEST_F(PortsTest, MergePortsFailsGracefully) { // This tests that the system remains in a well-defined state if something // goes wrong during port merge. diff --git a/chromium/mojo/edk/system/ports_message.cc b/chromium/mojo/edk/system/ports_message.cc index 91fe3f3bc35..5f3e8c01251 100644 --- a/chromium/mojo/edk/system/ports_message.cc +++ b/chromium/mojo/edk/system/ports_message.cc @@ -4,16 +4,18 @@ #include "mojo/edk/system/ports_message.h" +#include "base/memory/ptr_util.h" #include "mojo/edk/system/node_channel.h" namespace mojo { namespace edk { // static -scoped_ptr<PortsMessage> PortsMessage::NewUserMessage(size_t num_payload_bytes, - size_t num_ports, - size_t num_handles) { - return make_scoped_ptr( +std::unique_ptr<PortsMessage> PortsMessage::NewUserMessage( + size_t num_payload_bytes, + size_t num_ports, + size_t num_handles) { + return base::WrapUnique( new PortsMessage(num_payload_bytes, num_ports, num_handles)); } diff --git a/chromium/mojo/edk/system/ports_message.h b/chromium/mojo/edk/system/ports_message.h index 8e3036e0002..e2e2e836926 100644 --- a/chromium/mojo/edk/system/ports_message.h +++ b/chromium/mojo/edk/system/ports_message.h @@ -5,9 +5,9 @@ #ifndef MOJO_EDK_SYSTEM_PORTS_MESSAGE_H__ #define MOJO_EDK_SYSTEM_PORTS_MESSAGE_H__ +#include <memory> #include <utility> -#include "base/memory/scoped_ptr.h" #include "mojo/edk/embedder/platform_handle_vector.h" #include "mojo/edk/system/channel.h" #include "mojo/edk/system/ports/message.h" @@ -19,9 +19,9 @@ class NodeController; class PortsMessage : public ports::Message { public: - static scoped_ptr<PortsMessage> NewUserMessage(size_t num_payload_bytes, - size_t num_ports, - size_t num_handles); + static std::unique_ptr<PortsMessage> NewUserMessage(size_t num_payload_bytes, + size_t num_ports, + size_t num_handles); ~PortsMessage() override; diff --git a/chromium/mojo/edk/system/remote_message_pipe_bootstrap.cc b/chromium/mojo/edk/system/remote_message_pipe_bootstrap.cc index eb38674cac4..d376cca5838 100644 --- a/chromium/mojo/edk/system/remote_message_pipe_bootstrap.cc +++ b/chromium/mojo/edk/system/remote_message_pipe_bootstrap.cc @@ -7,7 +7,7 @@ #include "base/bind.h" #include "base/bind_helpers.h" #include "base/macros.h" -#include "base/thread_task_runner_handle.h" +#include "base/threading/thread_task_runner_handle.h" #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/system/node_controller.h" #include "mojo/edk/system/ports/name.h" diff --git a/chromium/mojo/edk/system/shared_buffer_dispatcher.cc b/chromium/mojo/edk/system/shared_buffer_dispatcher.cc index d531989c103..df391050a2a 100644 --- a/chromium/mojo/edk/system/shared_buffer_dispatcher.cc +++ b/chromium/mojo/edk/system/shared_buffer_dispatcher.cc @@ -8,10 +8,10 @@ #include <stdint.h> #include <limits> +#include <memory> #include <utility> #include "base/logging.h" -#include "base/memory/scoped_ptr.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/system/configuration.h" #include "mojo/edk/system/node_controller.h" @@ -218,7 +218,7 @@ MojoResult SharedBufferDispatcher::MapBuffer( uint64_t offset, uint64_t num_bytes, MojoMapBufferFlags flags, - scoped_ptr<PlatformSharedBufferMapping>* mapping) { + std::unique_ptr<PlatformSharedBufferMapping>* mapping) { if (offset > static_cast<uint64_t>(std::numeric_limits<size_t>::max())) return MOJO_RESULT_INVALID_ARGUMENT; if (num_bytes > static_cast<uint64_t>(std::numeric_limits<size_t>::max())) @@ -276,7 +276,7 @@ bool SharedBufferDispatcher::BeginTransit() { base::AutoLock lock(lock_); if (in_transit_) return false; - in_transit_ = shared_buffer_ != nullptr; + in_transit_ = static_cast<bool>(shared_buffer_); return in_transit_; } diff --git a/chromium/mojo/edk/system/shared_buffer_dispatcher.h b/chromium/mojo/edk/system/shared_buffer_dispatcher.h index ffc07ff11e4..1648dd274b1 100644 --- a/chromium/mojo/edk/system/shared_buffer_dispatcher.h +++ b/chromium/mojo/edk/system/shared_buffer_dispatcher.h @@ -77,7 +77,7 @@ class MOJO_SYSTEM_IMPL_EXPORT SharedBufferDispatcher final : public Dispatcher { uint64_t offset, uint64_t num_bytes, MojoMapBufferFlags flags, - scoped_ptr<PlatformSharedBufferMapping>* mapping) override; + std::unique_ptr<PlatformSharedBufferMapping>* mapping) override; void StartSerialize(uint32_t* num_bytes, uint32_t* num_ports, uint32_t* num_platform_handles) override; diff --git a/chromium/mojo/edk/system/shared_buffer_dispatcher_unittest.cc b/chromium/mojo/edk/system/shared_buffer_dispatcher_unittest.cc index 6e26bf9f1bd..c95bdc3b704 100644 --- a/chromium/mojo/edk/system/shared_buffer_dispatcher_unittest.cc +++ b/chromium/mojo/edk/system/shared_buffer_dispatcher_unittest.cc @@ -119,7 +119,7 @@ TEST_F(SharedBufferDispatcherTest, CreateAndMapBuffer) { EXPECT_EQ(Dispatcher::Type::SHARED_BUFFER, dispatcher->GetType()); // Make a couple of mappings. - scoped_ptr<PlatformSharedBufferMapping> mapping1; + std::unique_ptr<PlatformSharedBufferMapping> mapping1; EXPECT_EQ(MOJO_RESULT_OK, dispatcher->MapBuffer( 0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1)); ASSERT_TRUE(mapping1); @@ -128,7 +128,7 @@ TEST_F(SharedBufferDispatcherTest, CreateAndMapBuffer) { // Write something. static_cast<char*>(mapping1->GetBase())[50] = 'x'; - scoped_ptr<PlatformSharedBufferMapping> mapping2; + std::unique_ptr<PlatformSharedBufferMapping> mapping2; EXPECT_EQ(MOJO_RESULT_OK, dispatcher->MapBuffer( 50, 50, MOJO_MAP_BUFFER_FLAG_NONE, &mapping2)); ASSERT_TRUE(mapping2); @@ -156,7 +156,7 @@ TEST_F(SharedBufferDispatcherTest, CreateAndMapBufferFromPlatformBuffer) { EXPECT_EQ(Dispatcher::Type::SHARED_BUFFER, dispatcher->GetType()); // Make a couple of mappings. - scoped_ptr<PlatformSharedBufferMapping> mapping1; + std::unique_ptr<PlatformSharedBufferMapping> mapping1; EXPECT_EQ(MOJO_RESULT_OK, dispatcher->MapBuffer( 0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1)); ASSERT_TRUE(mapping1); @@ -165,7 +165,7 @@ TEST_F(SharedBufferDispatcherTest, CreateAndMapBufferFromPlatformBuffer) { // Write something. static_cast<char*>(mapping1->GetBase())[50] = 'x'; - scoped_ptr<PlatformSharedBufferMapping> mapping2; + std::unique_ptr<PlatformSharedBufferMapping> mapping2; EXPECT_EQ(MOJO_RESULT_OK, dispatcher->MapBuffer( 50, 50, MOJO_MAP_BUFFER_FLAG_NONE, &mapping2)); ASSERT_TRUE(mapping2); @@ -188,7 +188,7 @@ TEST_F(SharedBufferDispatcherTest, DuplicateBufferHandle) { nullptr, 100, &dispatcher1)); // Map and write something. - scoped_ptr<PlatformSharedBufferMapping> mapping; + std::unique_ptr<PlatformSharedBufferMapping> mapping; EXPECT_EQ(MOJO_RESULT_OK, dispatcher1->MapBuffer( 0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping)); static_cast<char*>(mapping->GetBase())[0] = 'x'; @@ -230,7 +230,7 @@ TEST_F(SharedBufferDispatcherTest, DuplicateBufferHandleOptionsValid) { ASSERT_TRUE(dispatcher2); EXPECT_EQ(Dispatcher::Type::SHARED_BUFFER, dispatcher2->GetType()); { - scoped_ptr<PlatformSharedBufferMapping> mapping; + std::unique_ptr<PlatformSharedBufferMapping> mapping; EXPECT_EQ(MOJO_RESULT_OK, dispatcher2->MapBuffer(0, 100, 0, &mapping)); } EXPECT_EQ(MOJO_RESULT_OK, dispatcher2->Close()); @@ -291,7 +291,7 @@ TEST_F(SharedBufferDispatcherTest, MapBufferInvalidArguments) { SharedBufferDispatcher::kDefaultCreateOptions, nullptr, 100, &dispatcher)); - scoped_ptr<PlatformSharedBufferMapping> mapping; + std::unique_ptr<PlatformSharedBufferMapping> mapping; EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dispatcher->MapBuffer(0, 101, MOJO_MAP_BUFFER_FLAG_NONE, &mapping)); EXPECT_FALSE(mapping); diff --git a/chromium/mojo/edk/system/shared_buffer_unittest.cc b/chromium/mojo/edk/system/shared_buffer_unittest.cc index 9451ae865b3..3a728728a57 100644 --- a/chromium/mojo/edk/system/shared_buffer_unittest.cc +++ b/chromium/mojo/edk/system/shared_buffer_unittest.cc @@ -63,13 +63,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CopyToBufferClient, SharedBufferTest, h) { EXPECT_EQ("quit", ReadMessage(h)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PassSharedBufferCrossProcess DISABLED_PassSharedBufferCrossProcess -#else -#define MAYBE_PassSharedBufferCrossProcess PassSharedBufferCrossProcess -#endif -TEST_F(SharedBufferTest, MAYBE_PassSharedBufferCrossProcess) { +TEST_F(SharedBufferTest, PassSharedBufferCrossProcess) { const std::string message = "hello"; MojoHandle b = CreateBuffer(message.size()); @@ -93,13 +87,7 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateBufferClient, SharedBufferTest, h) { EXPECT_EQ("quit", ReadMessage(h)); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PassSharedBufferFromChild DISABLED_PassSharedBufferFromChild -#else -#define MAYBE_PassSharedBufferFromChild PassSharedBufferFromChild -#endif -TEST_F(SharedBufferTest, MAYBE_PassSharedBufferFromChild) { +TEST_F(SharedBufferTest, PassSharedBufferFromChild) { const std::string message = "hello"; MojoHandle b; RUN_CHILD_ON_PIPE(CreateBufferClient, h) @@ -126,7 +114,6 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndPassBuffer, SharedBufferTest, h) { WriteMessageWithHandles(other_child, "", &dupe, 1); EXPECT_EQ("quit", ReadMessage(h)); - WriteMessage(h, "ok"); } DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReceiveAndEditBuffer, SharedBufferTest, h) { @@ -143,17 +130,9 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReceiveAndEditBuffer, SharedBufferTest, h) { WriteToBuffer(b, 0, message); EXPECT_EQ(MOJO_RESULT_OK, MojoClose(b)); EXPECT_EQ("quit", ReadMessage(h)); - WriteMessage(h, "ok"); } -#if defined(OS_ANDROID) -// Android multi-process tests are not executing the new process. This is flaky. -#define MAYBE_PassSharedBufferFromChildToChild \ - DISABLED_PassSharedBufferFromChildToChild -#else -#define MAYBE_PassSharedBufferFromChildToChild PassSharedBufferFromChildToChild -#endif -TEST_F(SharedBufferTest, MAYBE_PassSharedBufferFromChildToChild) { +TEST_F(SharedBufferTest, PassSharedBufferFromChildToChild) { const std::string message = "hello"; MojoHandle p0, p1; CreateMessagePipe(&p0, &p1); @@ -171,10 +150,8 @@ TEST_F(SharedBufferTest, MAYBE_PassSharedBufferFromChildToChild) { ReadMessageWithHandles(h0, &b, 1); WriteMessage(h1, "quit"); - EXPECT_EQ("ok", ReadMessage(h1)); END_CHILD() WriteMessage(h0, "quit"); - EXPECT_EQ("ok", ReadMessage(h0)); END_CHILD() // The second child should have written this message. @@ -197,8 +174,6 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndPassBufferParent, SharedBufferTest, EXPECT_EQ("quit", ReadMessage(parent)); WriteMessage(child, "quit"); - EXPECT_EQ("ok", ReadMessage(child)); - WriteMessage(parent, "ok"); END_CHILD() } @@ -212,8 +187,6 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReceiveAndEditBufferParent, SharedBufferTest, EXPECT_EQ("quit", ReadMessage(parent)); WriteMessage(child, "quit"); - EXPECT_EQ("ok", ReadMessage(child)); - WriteMessage(parent, "ok"); END_CHILD() } @@ -245,10 +218,8 @@ TEST_F(SharedBufferTest, MAYBE_PassHandleBetweenCousins) { ReadMessageWithHandles(child1, &b, 1); WriteMessage(child2, "quit"); - EXPECT_EQ("ok", ReadMessage(child2)); END_CHILD() WriteMessage(child1, "quit"); - EXPECT_EQ("ok", ReadMessage(child1)); END_CHILD() // The second grandchild should have written this message. @@ -312,10 +283,8 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndPassReadOnlyBuffer, WriteMessage(h, "ok"); } -#if defined(OS_ANDROID) || (defined(OS_POSIX) && !defined(OS_MACOSX)) +#if defined(OS_ANDROID) // Android multi-process tests are not executing the new process. This is flaky. -// Non-OSX posix uses a sync broker to create shared memory. Creating read-only -// duplicates in child processes is not currently supported via the sync broker. #define MAYBE_CreateAndPassFromChildReadOnlyBuffer \ DISABLED_CreateAndPassFromChildReadOnlyBuffer #else diff --git a/chromium/mojo/edk/system/wait_set_dispatcher.h b/chromium/mojo/edk/system/wait_set_dispatcher.h index 5bf457ff451..619a1beaa2a 100644 --- a/chromium/mojo/edk/system/wait_set_dispatcher.h +++ b/chromium/mojo/edk/system/wait_set_dispatcher.h @@ -8,11 +8,11 @@ #include <stdint.h> #include <deque> +#include <memory> #include <unordered_map> #include <utility> #include "base/macros.h" -#include "base/memory/scoped_ptr.h" #include "base/synchronization/lock.h" #include "mojo/edk/system/awakable_list.h" #include "mojo/edk/system/dispatcher.h" @@ -92,7 +92,7 @@ class MOJO_SYSTEM_IMPL_EXPORT WaitSetDispatcher : public Dispatcher { AwakableList awakable_list_; // Waiter used to wait on dispatchers. - scoped_ptr<Waiter> waiter_; + std::unique_ptr<Waiter> waiter_; DISALLOW_COPY_AND_ASSIGN(WaitSetDispatcher); }; diff --git a/chromium/mojo/edk/system/wait_set_dispatcher_unittest.cc b/chromium/mojo/edk/system/wait_set_dispatcher_unittest.cc index 2a42be16452..42ac86548d6 100644 --- a/chromium/mojo/edk/system/wait_set_dispatcher_unittest.cc +++ b/chromium/mojo/edk/system/wait_set_dispatcher_unittest.cc @@ -13,6 +13,7 @@ #include "base/memory/ref_counted.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/system/core.h" +#include "mojo/edk/system/message_for_transit.h" #include "mojo/edk/system/message_pipe_dispatcher.h" #include "mojo/edk/system/request_context.h" #include "mojo/edk/system/test_utils.h" @@ -74,6 +75,35 @@ class WaitSetDispatcherTest : public ::testing::Test { dispatchers_to_close_.push_back(dispatcher); } + void WriteMessage(MessagePipeDispatcher* dispatcher, + const void* bytes, + size_t num_bytes) { + Core* core = mojo::edk::internal::g_core; + MojoMessageHandle msg; + ASSERT_EQ(MOJO_RESULT_OK, + core->AllocMessage(static_cast<uint32_t>(num_bytes), nullptr, 0, + MOJO_ALLOC_MESSAGE_FLAG_NONE, &msg)); + void* buffer; + ASSERT_EQ(MOJO_RESULT_OK, core->GetMessageBuffer(msg, &buffer)); + memcpy(buffer, bytes, num_bytes); + + std::unique_ptr<MessageForTransit> message( + reinterpret_cast<MessageForTransit*>(msg)); + ASSERT_EQ(MOJO_RESULT_OK, + dispatcher->WriteMessage(std::move(message), + MOJO_WRITE_MESSAGE_FLAG_NONE)); + } + + void ReadMessage(MessagePipeDispatcher* dispatcher, + void* bytes, + uint32_t* num_bytes) { + std::unique_ptr<MessageForTransit> message; + ASSERT_EQ(MOJO_RESULT_OK, + dispatcher->ReadMessage(&message, num_bytes, nullptr, 0, + MOJO_READ_MESSAGE_FLAG_NONE, false)); + memcpy(bytes, message->bytes(), *num_bytes); + } + protected: scoped_refptr<MessagePipeDispatcher> dispatcher0_; scoped_refptr<MessagePipeDispatcher> dispatcher1_; @@ -140,9 +170,7 @@ TEST_F(WaitSetDispatcherTest, Basic) { // Write to |dispatcher1_|, which should make |dispatcher0_| readable. char buffer[] = "abcd"; w.Init(); - ASSERT_EQ(MOJO_RESULT_OK, - dispatcher1_->WriteMessage(buffer, sizeof(buffer), nullptr, 0, - MOJO_WRITE_MESSAGE_FLAG_NONE)); + WriteMessage(dispatcher1_.get(), buffer, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr)); woken_dispatcher = nullptr; context = 0; @@ -187,9 +215,7 @@ TEST_F(WaitSetDispatcherTest, HandleWithoutRemoving) { // Write to |dispatcher1_|, which should make |dispatcher0_| readable. char buffer[] = "abcd"; w.Init(); - ASSERT_EQ(MOJO_RESULT_OK, - dispatcher1_->WriteMessage(buffer, sizeof(buffer), nullptr, 0, - MOJO_WRITE_MESSAGE_FLAG_NONE)); + WriteMessage(dispatcher1_.get(), buffer, sizeof(buffer)); EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr)); woken_dispatcher = nullptr; context = 0; @@ -201,9 +227,7 @@ TEST_F(WaitSetDispatcherTest, HandleWithoutRemoving) { // Read from |dispatcher0_| which should change it's state to non-readable. char read_buffer[sizeof(buffer) + 5]; uint32_t num_bytes = sizeof(read_buffer); - ASSERT_EQ(MOJO_RESULT_OK, - dispatcher0_->ReadMessage(read_buffer, &num_bytes, nullptr, - nullptr, MOJO_READ_MESSAGE_FLAG_NONE)); + ReadMessage(dispatcher0_.get(), read_buffer, &num_bytes); EXPECT_EQ(sizeof(buffer), num_bytes); // No dispatchers are ready. @@ -212,7 +236,7 @@ TEST_F(WaitSetDispatcherTest, HandleWithoutRemoving) { context = 0; EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, GetOneReadyDispatcher(wait_set, &woken_dispatcher, &context)); - EXPECT_EQ(nullptr, woken_dispatcher); + EXPECT_FALSE(woken_dispatcher); EXPECT_EQ(0u, context); EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); } @@ -314,9 +338,7 @@ TEST_F(WaitSetDispatcherTest, MultipleReady) { // Write to |dispatcher1_|, which should make |dispatcher0_| readable. char buffer[] = "abcd"; w.Init(); - ASSERT_EQ(MOJO_RESULT_OK, - dispatcher1_->WriteMessage(buffer, sizeof(buffer), nullptr, 0, - MOJO_WRITE_MESSAGE_FLAG_NONE)); + WriteMessage(dispatcher1_.get(), buffer, sizeof(buffer)); { Waiter mp_w; mp_w.Init(); |