diff options
author | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2017-04-05 17:15:33 +0200 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2017-04-11 07:47:18 +0000 |
commit | 7324afb043a0b1e623d8e8eb906cdc53bdeb4685 (patch) | |
tree | a3fe2d74ea9c9e142c390dac4ca0e219382ace46 /chromium/ipc | |
parent | 6a4cabb866f66d4128a97cdc6d9d08ce074f1247 (diff) | |
download | qtwebengine-chromium-7324afb043a0b1e623d8e8eb906cdc53bdeb4685.tar.gz |
BASELINE: Update Chromium to 58.0.3029.54
Change-Id: I67f57065a7afdc8e4614adb5c0230281428df4d1
Reviewed-by: Peter Varga <pvarga@inf.u-szeged.hu>
Diffstat (limited to 'chromium/ipc')
23 files changed, 470 insertions, 308 deletions
diff --git a/chromium/ipc/BUILD.gn b/chromium/ipc/BUILD.gn index 07b878ae3f4..16959fbaecb 100644 --- a/chromium/ipc/BUILD.gn +++ b/chromium/ipc/BUILD.gn @@ -98,12 +98,12 @@ component("ipc") { defines = [ "IPC_IMPLEMENTATION" ] public_deps = [ + ":mojom", ":param_traits", "//mojo/public/cpp/bindings", "//mojo/public/cpp/system", ] deps = [ - ":mojom", "//base", ] @@ -122,6 +122,9 @@ mojom("mojom") { sources = [ "ipc.mojom", ] + export_class_attribute = "IPC_EXPORT" + export_define = "IPC_IMPLEMENTATION" + export_header = "ipc/ipc_export.h" } mojom("test_interfaces") { @@ -180,7 +183,6 @@ if (!is_ios) { deps = [ ":ipc", - ":mojom", ":run_all_unittests", ":test_interfaces", ":test_support", diff --git a/chromium/ipc/OWNERS b/chromium/ipc/OWNERS index be9d89f335a..5ac52a2b495 100644 --- a/chromium/ipc/OWNERS +++ b/chromium/ipc/OWNERS @@ -12,3 +12,5 @@ per-file *.mojom=set noparent per-file *.mojom=file://ipc/SECURITY_OWNERS per-file *_param_traits*.*=set noparent per-file *_param_traits*.*=file://ipc/SECURITY_OWNERS + +# COMPONENT: Internals>Core diff --git a/chromium/ipc/ipc_channel.h b/chromium/ipc/ipc_channel.h index 1c0a89d20d5..527c949349c 100644 --- a/chromium/ipc/ipc_channel.h +++ b/chromium/ipc/ipc_channel.h @@ -18,13 +18,14 @@ #include "base/single_thread_task_runner.h" #include "base/threading/thread_task_runner_handle.h" #include "build/build_config.h" +#include "ipc/ipc.mojom.h" #include "ipc/ipc_channel_handle.h" #include "ipc/ipc_message.h" #include "ipc/ipc_sender.h" -#include "mojo/public/cpp/bindings/associated_group.h" #include "mojo/public/cpp/bindings/associated_interface_ptr.h" #include "mojo/public/cpp/bindings/associated_interface_request.h" #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" +#include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h" #if defined(OS_POSIX) #include <sys/types.h> @@ -90,9 +91,10 @@ class IPC_EXPORT Channel : public Sender { virtual ~AssociatedInterfaceSupport() {} - // Accesses the AssociatedGroup used to associate new interface endpoints - // with this Channel. Must be safe to call from any thread. - virtual mojo::AssociatedGroup* GetAssociatedGroup() = 0; + // Returns a ThreadSafeForwarded for this channel which can be used to + // safely send mojom::Channel requests from arbitrary threads. + virtual std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>> + CreateThreadSafeChannel() = 0; // Adds an interface factory to this channel for interface |name|. Must be // safe to call from any thread. @@ -121,8 +123,7 @@ class IPC_EXPORT Channel : public Sender { template <typename Interface> void GetRemoteAssociatedInterface( mojo::AssociatedInterfacePtr<Interface>* proxy) { - mojo::AssociatedInterfaceRequest<Interface> request = - mojo::MakeRequest(proxy, GetAssociatedGroup()); + auto request = mojo::MakeRequest(proxy); GetGenericRemoteAssociatedInterface( Interface::Name_, request.PassHandle()); } diff --git a/chromium/ipc/ipc_channel_mojo.cc b/chromium/ipc/ipc_channel_mojo.cc index 4189b9c06f1..ab376f4bdc4 100644 --- a/chromium/ipc/ipc_channel_mojo.cc +++ b/chromium/ipc/ipc_channel_mojo.cc @@ -274,25 +274,49 @@ ChannelMojo::ChannelMojo( Mode mode, Listener* listener, const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) - : pipe_(handle.get()), listener_(listener), weak_factory_(this) { - // Create MojoBootstrap after all members are set as it touches - // ChannelMojo from a different thread. - bootstrap_ = - MojoBootstrap::Create(std::move(handle), mode, this, ipc_task_runner); + : task_runner_(ipc_task_runner), + pipe_(handle.get()), + listener_(listener), + weak_factory_(this) { + bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, ipc_task_runner); +} + +void ChannelMojo::ForwardMessageFromThreadSafePtr(mojo::Message message) { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + if (!message_reader_ || !message_reader_->sender().is_bound()) + return; + message_reader_->sender().internal_state()->ForwardMessage( + std::move(message)); +} + +void ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr( + mojo::Message message, + std::unique_ptr<mojo::MessageReceiver> responder) { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + if (!message_reader_ || !message_reader_->sender().is_bound()) + return; + message_reader_->sender().internal_state()->ForwardMessageWithResponder( + std::move(message), std::move(responder)); } ChannelMojo::~ChannelMojo() { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); Close(); } bool ChannelMojo::Connect() { + DCHECK(task_runner_->RunsTasksOnCurrentThread()); + WillConnect(); - DCHECK(!task_runner_); - task_runner_ = base::ThreadTaskRunnerHandle::Get(); - DCHECK(!message_reader_); + mojom::ChannelAssociatedPtr sender; + mojom::ChannelAssociatedRequest receiver; + bootstrap_->Connect(&sender, &receiver); - bootstrap_->Connect(); + DCHECK(!message_reader_); + sender->SetPeerPid(GetSelfPID()); + message_reader_.reset(new internal::MessagePipeReader( + pipe_, std::move(sender), std::move(receiver), this)); return true; } @@ -321,14 +345,6 @@ void ChannelMojo::Close() { associated_interfaces_.clear(); } -// MojoBootstrap::Delegate implementation -void ChannelMojo::OnPipesAvailable(mojom::ChannelAssociatedPtr sender, - mojom::ChannelAssociatedRequest receiver) { - sender->SetPeerPid(GetSelfPID()); - message_reader_.reset(new internal::MessagePipeReader( - pipe_, std::move(sender), std::move(receiver), this)); -} - void ChannelMojo::OnPipeError() { DCHECK(task_runner_); if (task_runner_->RunsTasksOnCurrentThread()) { @@ -377,6 +393,16 @@ bool ChannelMojo::Send(Message* message) { Channel::AssociatedInterfaceSupport* ChannelMojo::GetAssociatedInterfaceSupport() { return this; } +std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>> +ChannelMojo::CreateThreadSafeChannel() { + return base::MakeUnique<mojo::ThreadSafeForwarder<mojom::Channel>>( + task_runner_, base::Bind(&ChannelMojo::ForwardMessageFromThreadSafePtr, + weak_factory_.GetWeakPtr()), + base::Bind(&ChannelMojo::ForwardMessageWithResponderFromThreadSafePtr, + weak_factory_.GetWeakPtr()), + *bootstrap_->GetAssociatedGroup()); +} + void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) { listener_->OnChannelConnected(peer_pid); } @@ -442,11 +468,6 @@ MojoResult ChannelMojo::WriteToMessageAttachmentSet( return MOJO_RESULT_OK; } -mojo::AssociatedGroup* ChannelMojo::GetAssociatedGroup() { - DCHECK(bootstrap_); - return bootstrap_->GetAssociatedGroup(); -} - void ChannelMojo::AddGenericAssociatedInterface( const std::string& name, const GenericAssociatedInterfaceFactory& factory) { @@ -458,8 +479,14 @@ void ChannelMojo::AddGenericAssociatedInterface( void ChannelMojo::GetGenericRemoteAssociatedInterface( const std::string& name, mojo::ScopedInterfaceEndpointHandle handle) { - if (message_reader_) + if (message_reader_) { message_reader_->GetRemoteInterface(name, std::move(handle)); + } else { + // Attach the associated interface to a disconnected pipe, so that the + // associated interface pointer can be used to make calls (which are + // dropped). + mojo::GetIsolatedInterface(std::move(handle)); + } } } // namespace IPC diff --git a/chromium/ipc/ipc_channel_mojo.h b/chromium/ipc/ipc_channel_mojo.h index 215b198d60c..20135b55edf 100644 --- a/chromium/ipc/ipc_channel_mojo.h +++ b/chromium/ipc/ipc_channel_mojo.h @@ -21,11 +21,13 @@ #include "base/task_runner.h" #include "base/threading/thread_task_runner_handle.h" #include "build/build_config.h" +#include "ipc/ipc.mojom.h" #include "ipc/ipc_channel.h" #include "ipc/ipc_channel_factory.h" #include "ipc/ipc_export.h" #include "ipc/ipc_message_pipe_reader.h" #include "ipc/ipc_mojo_bootstrap.h" +#include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h" #include "mojo/public/cpp/system/core.h" namespace IPC { @@ -42,7 +44,6 @@ namespace IPC { class IPC_EXPORT ChannelMojo : public Channel, public Channel::AssociatedInterfaceSupport, - public NON_EXPORTED_BASE(MojoBootstrap::Delegate), public NON_EXPORTED_BASE(internal::MessagePipeReader::Delegate) { public: // Creates a ChannelMojo. @@ -84,10 +85,6 @@ class IPC_EXPORT ChannelMojo Message* message, base::Optional<std::vector<mojom::SerializedHandlePtr>>* handles); - // MojoBootstrapDelegate implementation - void OnPipesAvailable(mojom::ChannelAssociatedPtr sender, - mojom::ChannelAssociatedRequest receiver) override; - // MessagePipeReader::Delegate void OnPeerPidReceived(int32_t peer_pid) override; void OnMessageReceived(const Message& message) override; @@ -103,8 +100,14 @@ class IPC_EXPORT ChannelMojo Listener* listener, const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner); + void ForwardMessageFromThreadSafePtr(mojo::Message message); + void ForwardMessageWithResponderFromThreadSafePtr( + mojo::Message message, + std::unique_ptr<mojo::MessageReceiver> responder); + // Channel::AssociatedInterfaceSupport: - mojo::AssociatedGroup* GetAssociatedGroup() override; + std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>> + CreateThreadSafeChannel() override; void AddGenericAssociatedInterface( const std::string& name, const GenericAssociatedInterfaceFactory& factory) override; @@ -113,7 +116,7 @@ class IPC_EXPORT ChannelMojo mojo::ScopedInterfaceEndpointHandle handle) override; // A TaskRunner which runs tasks on the ChannelMojo's owning thread. - scoped_refptr<base::TaskRunner> task_runner_; + scoped_refptr<base::SingleThreadTaskRunner> task_runner_; const mojo::MessagePipeHandle pipe_; std::unique_ptr<MojoBootstrap> bootstrap_; diff --git a/chromium/ipc/ipc_channel_mojo_unittest.cc b/chromium/ipc/ipc_channel_mojo_unittest.cc index 5e75fbd7cfd..4dbf339c621 100644 --- a/chromium/ipc/ipc_channel_mojo_unittest.cc +++ b/chromium/ipc/ipc_channel_mojo_unittest.cc @@ -12,6 +12,7 @@ #include "base/base_paths.h" #include "base/bind.h" +#include "base/callback_helpers.h" #include "base/files/file.h" #include "base/files/scoped_temp_dir.h" #include "base/location.h" @@ -919,8 +920,7 @@ DEFINE_IPC_CHANNEL_MOJO_TEST_CLIENT_WITH_CUSTOM_FIXTURE( IPC::mojom::IndirectTestDriverAssociatedPtr driver; IPC::mojom::PingReceiverAssociatedPtr ping_receiver; proxy()->GetRemoteAssociatedInterface(&driver); - driver->GetPingReceiver( - mojo::MakeRequest(&ping_receiver, driver.associated_group())); + driver->GetPingReceiver(mojo::MakeRequest(&ping_receiver)); base::RunLoop loop; ping_receiver->Ping(loop.QuitClosure()); @@ -1264,6 +1264,53 @@ DEFINE_IPC_CHANNEL_MOJO_TEST_CLIENT_WITH_CUSTOM_FIXTURE(CreatePausedClient, DestroyProxy(); } +TEST_F(IPCChannelProxyMojoTest, AssociatedRequestClose) { + Init("DropAssociatedRequest"); + + DummyListener listener; + CreateProxy(&listener); + RunProxy(); + + IPC::mojom::AssociatedInterfaceVendorAssociatedPtr vendor; + proxy()->GetRemoteAssociatedInterface(&vendor); + IPC::mojom::SimpleTestDriverAssociatedPtr tester; + vendor->GetTestInterface(mojo::MakeRequest(&tester)); + base::RunLoop run_loop; + tester.set_connection_error_handler(run_loop.QuitClosure()); + run_loop.Run(); + + proxy()->GetRemoteAssociatedInterface(&tester); + EXPECT_TRUE(WaitForClientShutdown()); + DestroyProxy(); +} + +class AssociatedInterfaceDroppingListener : public IPC::Listener { + public: + AssociatedInterfaceDroppingListener(const base::Closure& callback) + : callback_(callback) {} + bool OnMessageReceived(const IPC::Message& message) override { return false; } + + void OnAssociatedInterfaceRequest( + const std::string& interface_name, + mojo::ScopedInterfaceEndpointHandle handle) override { + if (interface_name == IPC::mojom::SimpleTestDriver::Name_) + base::ResetAndReturn(&callback_).Run(); + } + + private: + base::Closure callback_; +}; + +DEFINE_IPC_CHANNEL_MOJO_TEST_CLIENT_WITH_CUSTOM_FIXTURE(DropAssociatedRequest, + ChannelProxyClient) { + base::RunLoop run_loop; + AssociatedInterfaceDroppingListener listener(run_loop.QuitClosure()); + CreateProxy(&listener); + RunProxy(); + run_loop.Run(); + DestroyProxy(); +} + #if defined(OS_POSIX) class ListenerThatExpectsFile : public IPC::Listener { diff --git a/chromium/ipc/ipc_channel_nacl.cc b/chromium/ipc/ipc_channel_nacl.cc index 1fa8e691459..17b680ed37a 100644 --- a/chromium/ipc/ipc_channel_nacl.cc +++ b/chromium/ipc/ipc_channel_nacl.cc @@ -207,7 +207,7 @@ bool ChannelNacl::Send(Message* message) { "ChannelNacl::Send", message->header()->flags, TRACE_EVENT_FLAG_FLOW_OUT); - output_queue_.push_back(linked_ptr<Message>(message_ptr.release())); + output_queue_.push_back(std::move(message_ptr)); if (!waiting_connect_) return ProcessOutgoingMessages(); @@ -220,9 +220,9 @@ void ChannelNacl::DidRecvMsg(std::unique_ptr<MessageContents> contents) { if (pipe_ == -1) return; - linked_ptr<std::vector<char> > data(new std::vector<char>); + auto data = base::MakeUnique<std::vector<char>>(); data->swap(contents->data); - read_queue_.push_back(data); + read_queue_.push_back(std::move(data)); input_attachments_.reserve(contents->fds.size()); for (int fd : contents->fds) { @@ -273,7 +273,7 @@ bool ChannelNacl::ProcessOutgoingMessages() { // Write out all the messages. The trusted implementation is guaranteed to not // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg. while (!output_queue_.empty()) { - linked_ptr<Message> msg = output_queue_.front(); + std::unique_ptr<Message> msg = std::move(output_queue_.front()); output_queue_.pop_front(); const size_t num_fds = msg->attachment_set()->size(); @@ -330,7 +330,7 @@ ChannelNacl::ReadState ChannelNacl::ReadData( if (read_queue_.empty()) return READ_PENDING; while (!read_queue_.empty() && *bytes_read < buffer_len) { - linked_ptr<std::vector<char> > vec(read_queue_.front()); + std::vector<char>* vec = read_queue_.front().get(); size_t bytes_to_read = buffer_len - *bytes_read; if (vec->size() <= bytes_to_read) { // We can read and discard the entire vector. @@ -386,7 +386,7 @@ std::unique_ptr<Channel> Channel::Create( const IPC::ChannelHandle& channel_handle, Mode mode, Listener* listener) { - return base::WrapUnique(new ChannelNacl(channel_handle, mode, listener)); + return base::MakeUnique<ChannelNacl>(channel_handle, mode, listener); } } // namespace IPC diff --git a/chromium/ipc/ipc_channel_nacl.h b/chromium/ipc/ipc_channel_nacl.h index 1860922c5cc..e989f128559 100644 --- a/chromium/ipc/ipc_channel_nacl.h +++ b/chromium/ipc/ipc_channel_nacl.h @@ -10,7 +10,6 @@ #include <string> #include "base/macros.h" -#include "base/memory/linked_ptr.h" #include "base/memory/weak_ptr.h" #include "base/process/process.h" #include "base/threading/simple_thread.h" @@ -97,13 +96,13 @@ class ChannelNacl : public Channel, // the trouble given that we probably want to implement 1 and // 2 above in NaCl eventually. // When ReadData is called, it pulls the bytes out of this queue in order. - std::deque<linked_ptr<std::vector<char> > > read_queue_; + std::deque<std::unique_ptr<std::vector<char>>> read_queue_; // Queue of file descriptor attachments extracted from imc_recvmsg messages. std::vector<scoped_refptr<MessageAttachment>> input_attachments_; // This queue is used when a message is sent prior to Connect having been // called. Normally after we're connected, the queue is empty. - std::deque<linked_ptr<Message> > output_queue_; + std::deque<std::unique_ptr<Message>> output_queue_; base::WeakPtrFactory<ChannelNacl> weak_ptr_factory_; diff --git a/chromium/ipc/ipc_channel_proxy.cc b/chromium/ipc/ipc_channel_proxy.cc index 91e8236e8cd..ef1c0b22725 100644 --- a/chromium/ipc/ipc_channel_proxy.cc +++ b/chromium/ipc/ipc_channel_proxy.cc @@ -67,7 +67,7 @@ void ChannelProxy::Context::CreateChannel( Channel::AssociatedInterfaceSupport* support = channel_->GetAssociatedInterfaceSupport(); if (support) { - associated_group_ = *support->GetAssociatedGroup(); + thread_safe_channel_ = support->CreateThreadSafeChannel(); base::AutoLock l(pending_filters_lock_); for (auto& entry : pending_io_thread_interfaces_) @@ -374,7 +374,6 @@ void ChannelProxy::Context::OnDispatchAssociatedInterfaceRequest( void ChannelProxy::Context::ClearChannel() { base::AutoLock l(channel_lifetime_lock_); channel_.reset(); - associated_group_ = mojo::AssociatedGroup(); } void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread( @@ -398,19 +397,6 @@ void ChannelProxy::Context::Send(Message* message) { base::Passed(base::WrapUnique(message)))); } -// Called on the IPC::Channel thread -void ChannelProxy::Context::GetRemoteAssociatedInterface( - const std::string& name, - mojo::ScopedInterfaceEndpointHandle handle) { - if (!channel_) - return; - Channel::AssociatedInterfaceSupport* associated_interface_support = - channel_->GetAssociatedInterfaceSupport(); - DCHECK(associated_interface_support); - associated_interface_support->GetGenericRemoteAssociatedInterface( - name, std::move(handle)); -} - //----------------------------------------------------------------------------- // static @@ -572,22 +558,18 @@ void ChannelProxy::AddGenericAssociatedInterfaceForIOThread( context()->AddGenericAssociatedInterfaceForIOThread(name, factory); } -mojo::AssociatedGroup* ChannelProxy::GetAssociatedGroup() { - return context()->associated_group(); -} - void ChannelProxy::GetGenericRemoteAssociatedInterface( const std::string& name, mojo::ScopedInterfaceEndpointHandle handle) { DCHECK(did_init_); - context_->ipc_task_runner()->PostTask( - FROM_HERE, base::Bind(&Context::GetRemoteAssociatedInterface, - context_, name, base::Passed(&handle))); + mojom::GenericInterfaceAssociatedRequest request; + request.Bind(std::move(handle)); + context()->thread_safe_channel().GetAssociatedInterface(name, + std::move(request)); } void ChannelProxy::ClearIPCTaskRunner() { DCHECK(CalledOnValidThread()); - context()->ClearIPCTaskRunner(); } diff --git a/chromium/ipc/ipc_channel_proxy.h b/chromium/ipc/ipc_channel_proxy.h index bda83bb1885..0bb7260c6b4 100644 --- a/chromium/ipc/ipc_channel_proxy.h +++ b/chromium/ipc/ipc_channel_proxy.h @@ -21,9 +21,10 @@ #include "ipc/ipc_channel_handle.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_sender.h" -#include "mojo/public/cpp/bindings/associated_group.h" +#include "mojo/public/cpp/bindings/associated_interface_ptr.h" #include "mojo/public/cpp/bindings/associated_interface_request.h" #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" +#include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h" namespace base { class SingleThreadTaskRunner; @@ -185,10 +186,6 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { factory)); } - // Gets the AssociatedGroup used to create new associated endpoints on this - // ChannelProxy. - mojo::AssociatedGroup* GetAssociatedGroup(); - // Requests an associated interface from the remote endpoint. void GetGenericRemoteAssociatedInterface( const std::string& name, @@ -198,8 +195,7 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { template <typename Interface> void GetRemoteAssociatedInterface( mojo::AssociatedInterfacePtr<Interface>* proxy) { - mojo::AssociatedInterfaceRequest<Interface> request = - mojo::MakeRequest(proxy, GetAssociatedGroup()); + auto request = mojo::MakeRequest(proxy); GetGenericRemoteAssociatedInterface(Interface::Name_, request.PassHandle()); } @@ -209,17 +205,20 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { } #endif + // Creates a ThreadSafeAssociatedInterfacePtr for |Interface|. This object + // may be used to send messages on the interface from any thread and those + // messages will remain ordered with respect to other messages sent on the + // same thread over other ThreadSafeAssociatedInterfacePtrs associated with + // the same Channel. template <typename Interface> - using AssociatedInterfaceRetrievedCallback = - base::Callback<void(mojo::AssociatedInterfacePtr<Interface>)>; - // Creates an AssociatedInterfacePtr to |Interface| on the IO thread and - // passes it to |callback|, also invoked on the IO thread. - template <typename Interface> - void RetrieveAssociatedInterfaceOnIOThread( - const AssociatedInterfaceRetrievedCallback<Interface>& callback) { - context_->ipc_task_runner()->PostTask( - FROM_HERE, base::Bind(&Context::RetrieveAssociatedInterface<Interface>, - context_, callback)); + void GetThreadSafeRemoteAssociatedInterface( + scoped_refptr<mojo::ThreadSafeAssociatedInterfacePtr<Interface>>* + out_ptr) { + mojo::AssociatedInterfacePtrInfo<Interface> ptr_info; + auto request = mojo::MakeRequest(&ptr_info); + GetGenericRemoteAssociatedInterface(Interface::Name_, request.PassHandle()); + *out_ptr = mojo::ThreadSafeAssociatedInterfacePtr<Interface>::Create( + std::move(ptr_info), ipc_task_runner()); } base::SingleThreadTaskRunner* ipc_task_runner() const { @@ -251,11 +250,6 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { // Sends |message| from appropriate thread. void Send(Message* message); - // Requests a remote associated interface on the IPC thread. - void GetRemoteAssociatedInterface( - const std::string& name, - mojo::ScopedInterfaceEndpointHandle handle); - protected: friend class base::RefCountedThreadSafe<Context>; ~Context() override; @@ -299,14 +293,6 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { void OnSendMessage(std::unique_ptr<Message> message_ptr); void OnAddFilter(); void OnRemoveFilter(MessageFilter* filter); - template <typename Interface> - void RetrieveAssociatedInterface( - const AssociatedInterfaceRetrievedCallback<Interface>& callback) { - mojo::AssociatedInterfacePtr<Interface> interface_ptr; - channel_->GetAssociatedInterfaceSupport()->GetRemoteAssociatedInterface( - &interface_ptr); - callback.Run(std::move(interface_ptr)); - } // Methods called on the listener thread. void AddFilter(MessageFilter* filter); @@ -319,7 +305,9 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { void ClearChannel(); - mojo::AssociatedGroup* associated_group() { return &associated_group_; } + mojom::Channel& thread_safe_channel() { + return thread_safe_channel_->proxy(); + } void AddGenericAssociatedInterfaceForIOThread( const std::string& name, @@ -358,7 +346,10 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe { base::ProcessId peer_pid_; base::Lock peer_pid_lock_; - mojo::AssociatedGroup associated_group_; + // A thread-safe mojom::Channel interface we use to make remote interface + // requests from the proxy thread. + std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>> + thread_safe_channel_; // Holds associated interface binders added by // AddGenericAssociatedInterfaceForIOThread until the underlying channel has diff --git a/chromium/ipc/ipc_message_macros.h b/chromium/ipc/ipc_message_macros.h index 237c4e2c56d..4fcaa161d34 100644 --- a/chromium/ipc/ipc_message_macros.h +++ b/chromium/ipc/ipc_message_macros.h @@ -388,10 +388,9 @@ IPC_MESSAGE_FORWARD_WITH_PARAM_DELAY_REPLY( \ msg_class, this, _IpcMessageHandlerClass::member_func) -// TODO(jar): fix chrome frame to always supply |code| argument. #define IPC_MESSAGE_HANDLER_GENERIC(msg_class, code) \ case msg_class::ID: { \ - /* TRACK_RUN_IN_THIS_SCOPED_REGION(code); TODO(jar) */ \ + TRACK_RUN_IN_THIS_SCOPED_REGION(code); \ code; \ } \ break; diff --git a/chromium/ipc/ipc_message_pipe_reader.h b/chromium/ipc/ipc_message_pipe_reader.h index 686982a97e3..b441960e3e1 100644 --- a/chromium/ipc/ipc_message_pipe_reader.h +++ b/chromium/ipc/ipc_message_pipe_reader.h @@ -84,7 +84,7 @@ class IPC_EXPORT MessagePipeReader : public NON_EXPORTED_BASE(mojom::Channel) { void GetRemoteInterface(const std::string& name, mojo::ScopedInterfaceEndpointHandle handle); - mojom::Channel* sender() const { return sender_.get(); } + mojom::ChannelAssociatedPtr& sender() { return sender_; } protected: void OnPipeClosed(); diff --git a/chromium/ipc/ipc_message_unittest.cc b/chromium/ipc/ipc_message_unittest.cc index 8d32b45dca5..bde96a13e72 100644 --- a/chromium/ipc/ipc_message_unittest.cc +++ b/chromium/ipc/ipc_message_unittest.cc @@ -65,7 +65,7 @@ TEST(IPCMessageTest, BasicMessageTest) { TEST(IPCMessageTest, ListValue) { base::ListValue input; - input.Set(0, new base::FundamentalValue(42.42)); + input.Set(0, new base::Value(42.42)); input.Set(1, new base::StringValue("forty")); input.Set(2, base::Value::CreateNullValue()); @@ -88,16 +88,16 @@ TEST(IPCMessageTest, ListValue) { TEST(IPCMessageTest, DictionaryValue) { base::DictionaryValue input; input.Set("null", base::Value::CreateNullValue()); - input.Set("bool", new base::FundamentalValue(true)); - input.Set("int", new base::FundamentalValue(42)); - input.SetWithoutPathExpansion("int.with.dot", new base::FundamentalValue(43)); + input.Set("bool", new base::Value(true)); + input.Set("int", new base::Value(42)); + input.SetWithoutPathExpansion("int.with.dot", new base::Value(43)); std::unique_ptr<base::DictionaryValue> subdict(new base::DictionaryValue()); subdict->Set("str", new base::StringValue("forty two")); - subdict->Set("bool", new base::FundamentalValue(false)); + subdict->Set("bool", new base::Value(false)); std::unique_ptr<base::ListValue> sublist(new base::ListValue()); - sublist->Set(0, new base::FundamentalValue(42.42)); + sublist->Set(0, new base::Value(42.42)); sublist->Set(1, new base::StringValue("forty")); sublist->Set(2, new base::StringValue("two")); subdict->Set("list", sublist.release()); diff --git a/chromium/ipc/ipc_message_utils.cc b/chromium/ipc/ipc_message_utils.cc index fb45218bff3..328e804a980 100644 --- a/chromium/ipc/ipc_message_utils.cc +++ b/chromium/ipc/ipc_message_utils.cc @@ -110,9 +110,7 @@ void GetValueSize(base::PickleSizer* sizer, break; } case base::Value::Type::BINARY: { - const base::BinaryValue* binary = - static_cast<const base::BinaryValue*>(value); - sizer->AddData(static_cast<int>(binary->GetSize())); + sizer->AddData(static_cast<int>(value->GetSize())); break; } case base::Value::Type::DICTIONARY: { @@ -180,9 +178,7 @@ void WriteValue(base::Pickle* m, const base::Value* value, int recursion) { break; } case base::Value::Type::BINARY: { - const base::BinaryValue* binary = - static_cast<const base::BinaryValue*>(value); - m->WriteData(binary->GetBuffer(), static_cast<int>(binary->GetSize())); + m->WriteData(value->GetBuffer(), static_cast<int>(value->GetSize())); break; } case base::Value::Type::DICTIONARY: { @@ -272,21 +268,21 @@ bool ReadValue(const base::Pickle* m, bool val; if (!ReadParam(m, iter, &val)) return false; - *value = new base::FundamentalValue(val); + *value = new base::Value(val); break; } case base::Value::Type::INTEGER: { int val; if (!ReadParam(m, iter, &val)) return false; - *value = new base::FundamentalValue(val); + *value = new base::Value(val); break; } case base::Value::Type::DOUBLE: { double val; if (!ReadParam(m, iter, &val)) return false; - *value = new base::FundamentalValue(val); + *value = new base::Value(val); break; } case base::Value::Type::STRING: { diff --git a/chromium/ipc/ipc_message_utils.h b/chromium/ipc/ipc_message_utils.h index 58d6aa26da3..40dd6706cb0 100644 --- a/chromium/ipc/ipc_message_utils.h +++ b/chromium/ipc/ipc_message_utils.h @@ -307,6 +307,37 @@ struct IPC_EXPORT ParamTraits<double> { static void Log(const param_type& p, std::string* l); }; +template <class P, size_t Size> +struct ParamTraits<P[Size]> { + using param_type = P[Size]; + static void GetSize(base::PickleSizer* sizer, const param_type& p) { + for (const P& element : p) + GetParamSize(sizer, element); + } + static void Write(base::Pickle* m, const param_type& p) { + for (const P& element : p) + WriteParam(m, element); + } + static bool Read(const base::Pickle* m, + base::PickleIterator* iter, + param_type* r) { + for (P& element : *r) { + if (!ReadParam(m, iter, &element)) + return false; + } + return true; + } + static void Log(const param_type& p, std::string* l) { + l->append("["); + for (const P& element : p) { + if (&element != &p[0]) + l->append(" "); + LogParam(element, l); + } + l->append("]"); + } +}; + // STL ParamTraits ------------------------------------------------------------- template <> diff --git a/chromium/ipc/ipc_message_utils_unittest.cc b/chromium/ipc/ipc_message_utils_unittest.cc index e5c82bc8d9c..dae86758dbe 100644 --- a/chromium/ipc/ipc_message_utils_unittest.cc +++ b/chromium/ipc/ipc_message_utils_unittest.cc @@ -97,13 +97,13 @@ TEST(IPCMessageUtilsTest, StackVector) { // Tests that PickleSizer and Pickle agree on the size of a complex base::Value. TEST(IPCMessageUtilsTest, ValueSize) { std::unique_ptr<base::DictionaryValue> value(new base::DictionaryValue); - value->SetWithoutPathExpansion("foo", new base::FundamentalValue(42)); - value->SetWithoutPathExpansion("bar", new base::FundamentalValue(3.14)); + value->SetWithoutPathExpansion("foo", new base::Value(42)); + value->SetWithoutPathExpansion("bar", new base::Value(3.14)); value->SetWithoutPathExpansion("baz", new base::StringValue("hello")); value->SetWithoutPathExpansion("qux", base::Value::CreateNullValue()); std::unique_ptr<base::DictionaryValue> nested_dict(new base::DictionaryValue); - nested_dict->SetWithoutPathExpansion("foobar", new base::FundamentalValue(5)); + nested_dict->SetWithoutPathExpansion("foobar", new base::Value(5)); value->SetWithoutPathExpansion("nested", std::move(nested_dict)); std::unique_ptr<base::ListValue> list_value(new base::ListValue); diff --git a/chromium/ipc/ipc_mojo_bootstrap.cc b/chromium/ipc/ipc_mojo_bootstrap.cc index 3ba136619a6..7dc7484e091 100644 --- a/chromium/ipc/ipc_mojo_bootstrap.cc +++ b/chromium/ipc/ipc_mojo_bootstrap.cc @@ -70,6 +70,7 @@ class ChannelAssociatedGroupController connector_->set_connection_error_handler( base::Bind(&ChannelAssociatedGroupController::OnPipeError, base::Unretained(this))); + connector_->SetWatcherHeapProfilerTag("IPC Channel"); } void Pause() { @@ -106,12 +107,14 @@ class ChannelAssociatedGroupController Endpoint* receiver_endpoint = new Endpoint(this, receiver_id); endpoints_.insert({ sender_id, sender_endpoint }); endpoints_.insert({ receiver_id, receiver_endpoint }); + sender_endpoint->set_handle_created(); + receiver_endpoint->set_handle_created(); } mojo::ScopedInterfaceEndpointHandle sender_handle = - CreateScopedInterfaceEndpointHandle(sender_id, true); + CreateScopedInterfaceEndpointHandle(sender_id); mojo::ScopedInterfaceEndpointHandle receiver_handle = - CreateScopedInterfaceEndpointHandle(receiver_id, true); + CreateScopedInterfaceEndpointHandle(receiver_id); sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0)); receiver->Bind(std::move(receiver_handle)); @@ -125,26 +128,43 @@ class ChannelAssociatedGroupController } // mojo::AssociatedGroupController: - void CreateEndpointHandlePair( - mojo::ScopedInterfaceEndpointHandle* local_endpoint, - mojo::ScopedInterfaceEndpointHandle* remote_endpoint) override { - base::AutoLock locker(lock_); - uint32_t id = 0; - do { - if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask) - next_interface_id_ = 2; - id = next_interface_id_++; - if (set_interface_id_namespace_bit_) - id |= mojo::kInterfaceIdNamespaceMask; - } while (ContainsKey(endpoints_, id)); + mojo::InterfaceId AssociateInterface( + mojo::ScopedInterfaceEndpointHandle handle_to_send) override { + if (!handle_to_send.pending_association()) + return mojo::kInvalidInterfaceId; - Endpoint* endpoint = new Endpoint(this, id); - if (encountered_error_) - endpoint->set_peer_closed(); - endpoints_.insert({ id, endpoint }); + uint32_t id = 0; + { + base::AutoLock locker(lock_); + do { + if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask) + next_interface_id_ = 2; + id = next_interface_id_++; + if (set_interface_id_namespace_bit_) + id |= mojo::kInterfaceIdNamespaceMask; + } while (ContainsKey(endpoints_, id)); + + Endpoint* endpoint = new Endpoint(this, id); + if (encountered_error_) + endpoint->set_peer_closed(); + endpoint->set_handle_created(); + endpoints_.insert({id, endpoint}); + } + + if (!NotifyAssociation(&handle_to_send, id)) { + // The peer handle of |handle_to_send|, which is supposed to join this + // associated group, has been closed. + { + base::AutoLock locker(lock_); + Endpoint* endpoint = FindEndpoint(id); + if (endpoint) + MarkClosedAndMaybeRemove(endpoint); + } - *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); - *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); + control_message_proxy_.NotifyPeerEndpointClosed( + id, handle_to_send.disconnect_reason()); + } + return id; } mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( @@ -155,35 +175,35 @@ class ChannelAssociatedGroupController base::AutoLock locker(lock_); bool inserted = false; Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); - if (inserted && encountered_error_) - endpoint->set_peer_closed(); + if (inserted) { + DCHECK(!endpoint->handle_created()); + if (encountered_error_) + endpoint->set_peer_closed(); + } else { + if (endpoint->handle_created()) + return mojo::ScopedInterfaceEndpointHandle(); + } - return CreateScopedInterfaceEndpointHandle(id, true); + endpoint->set_handle_created(); + return CreateScopedInterfaceEndpointHandle(id); } - void CloseEndpointHandle(mojo::InterfaceId id, bool is_local) override { + void CloseEndpointHandle( + mojo::InterfaceId id, + const base::Optional<mojo::DisconnectReason>& reason) override { if (!mojo::IsValidInterfaceId(id)) return; - - base::AutoLock locker(lock_); - if (!is_local) { + { + base::AutoLock locker(lock_); DCHECK(ContainsKey(endpoints_, id)); - DCHECK(!mojo::IsMasterInterfaceId(id)); - - base::AutoUnlock unlocker(lock_); - control_message_proxy_.NotifyEndpointClosedBeforeSent(id); - return; + Endpoint* endpoint = endpoints_[id].get(); + DCHECK(!endpoint->client()); + DCHECK(!endpoint->closed()); + MarkClosedAndMaybeRemove(endpoint); } - DCHECK(ContainsKey(endpoints_, id)); - Endpoint* endpoint = endpoints_[id].get(); - DCHECK(!endpoint->client()); - DCHECK(!endpoint->closed()); - MarkClosedAndMaybeRemove(endpoint); - - base::AutoUnlock unlocker(lock_); - if (!mojo::IsMasterInterfaceId(id)) - control_message_proxy_.NotifyPeerEndpointClosed(id); + if (!mojo::IsMasterInterfaceId(id) || reason) + control_message_proxy_.NotifyPeerEndpointClosed(id, reason); } mojo::InterfaceEndpointController* AttachEndpointClient( @@ -236,6 +256,47 @@ class ChannelAssociatedGroupController friend class Endpoint; friend class ControlMessageProxyThunk; + // MessageWrapper objects are always destroyed under the controller's lock. On + // destruction, if the message it wrappers contains + // ScopedInterfaceEndpointHandles (which cannot be destructed under the + // controller's lock), the wrapper unlocks to clean them up. + class MessageWrapper { + public: + MessageWrapper() = default; + + MessageWrapper(ChannelAssociatedGroupController* controller, + mojo::Message message) + : controller_(controller), value_(std::move(message)) {} + + MessageWrapper(MessageWrapper&& other) + : controller_(other.controller_), value_(std::move(other.value_)) {} + + ~MessageWrapper() { + if (value_.associated_endpoint_handles()->empty()) + return; + + controller_->lock_.AssertAcquired(); + { + base::AutoUnlock unlocker(controller_->lock_); + value_.mutable_associated_endpoint_handles()->clear(); + } + } + + MessageWrapper& operator=(MessageWrapper&& other) { + controller_ = other.controller_; + value_ = std::move(other.value_); + return *this; + } + + mojo::Message& value() { return value_; } + + private: + ChannelAssociatedGroupController* controller_ = nullptr; + mojo::Message value_; + + DISALLOW_COPY_AND_ASSIGN(MessageWrapper); + }; + class Endpoint : public base::RefCountedThreadSafe<Endpoint>, public mojo::InterfaceEndpointController { public: @@ -264,6 +325,25 @@ class ChannelAssociatedGroupController peer_closed_ = true; } + bool handle_created() const { + controller_->lock_.AssertAcquired(); + return handle_created_; + } + + void set_handle_created() { + controller_->lock_.AssertAcquired(); + handle_created_ = true; + } + + const base::Optional<mojo::DisconnectReason>& disconnect_reason() const { + return disconnect_reason_; + } + + void set_disconnect_reason( + const base::Optional<mojo::DisconnectReason>& disconnect_reason) { + disconnect_reason_ = disconnect_reason; + } + base::SingleThreadTaskRunner* task_runner() const { return task_runner_.get(); } @@ -295,7 +375,7 @@ class ChannelAssociatedGroupController sync_watcher_.reset(); } - uint32_t EnqueueSyncMessage(mojo::Message message) { + uint32_t EnqueueSyncMessage(MessageWrapper message) { controller_->lock_.AssertAcquired(); uint32_t id = GenerateSyncMessageId(); sync_messages_.emplace(id, std::move(message)); @@ -305,15 +385,16 @@ class ChannelAssociatedGroupController void SignalSyncMessageEvent() { controller_->lock_.AssertAcquired(); - EnsureSyncMessageEventExists(); - sync_message_event_->Signal(); + + if (sync_message_event_) + sync_message_event_->Signal(); } - mojo::Message PopSyncMessage(uint32_t id) { + MessageWrapper PopSyncMessage(uint32_t id) { controller_->lock_.AssertAcquired(); if (sync_messages_.empty() || sync_messages_.front().first != id) - return mojo::Message(); - mojo::Message message = std::move(sync_messages_.front().second); + return MessageWrapper(); + MessageWrapper message = std::move(sync_messages_.front().second); sync_messages_.pop(); return message; } @@ -367,14 +448,16 @@ class ChannelAssociatedGroupController base::AutoLock locker(controller_->lock_); bool more_to_process = false; if (!sync_messages_.empty()) { - mojo::Message message = std::move(sync_messages_.front().second); + MessageWrapper message_wrapper = + std::move(sync_messages_.front().second); sync_messages_.pop(); bool dispatch_succeeded; mojo::InterfaceEndpointClient* client = client_; { base::AutoUnlock unlocker(controller_->lock_); - dispatch_succeeded = client->HandleIncomingMessage(&message); + dispatch_succeeded = + client->HandleIncomingMessage(&message_wrapper.value()); } if (!sync_messages_.empty()) @@ -407,9 +490,11 @@ class ChannelAssociatedGroupController { base::AutoLock locker(controller_->lock_); - EnsureSyncMessageEventExists(); - if (!sync_messages_.empty()) - SignalSyncMessageEvent(); + if (!sync_message_event_) { + sync_message_event_.reset(new MojoEvent); + if (peer_closed_ || !sync_messages_.empty()) + SignalSyncMessageEvent(); + } } sync_watcher_.reset(new mojo::SyncHandleWatcher( @@ -436,11 +521,13 @@ class ChannelAssociatedGroupController bool closed_ = false; bool peer_closed_ = false; + bool handle_created_ = false; + base::Optional<mojo::DisconnectReason> disconnect_reason_; mojo::InterfaceEndpointClient* client_ = nullptr; scoped_refptr<base::SingleThreadTaskRunner> task_runner_; std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; std::unique_ptr<MojoEvent> sync_message_event_; - std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_; + std::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_; uint32_t next_sync_message_id_ = 0; DISALLOW_COPY_AND_ASSIGN(Endpoint); @@ -471,8 +558,16 @@ class ChannelAssociatedGroupController Endpoint* endpoint = iter->second.get(); ++iter; - DCHECK(endpoint->closed()); - MarkPeerClosedAndMaybeRemove(endpoint); + if (!endpoint->closed()) { + // This happens when a NotifyPeerEndpointClosed message been received, + // but the interface ID hasn't been used to create local endpoint + // handle. + DCHECK(!endpoint->client()); + DCHECK(endpoint->peer_closed()); + MarkClosedAndMaybeRemove(endpoint); + } else { + MarkPeerClosedAndMaybeRemove(endpoint); + } } DCHECK(endpoints_.empty()); @@ -538,9 +633,11 @@ class ChannelAssociatedGroupController DCHECK(endpoint->task_runner() && endpoint->client()); if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { mojo::InterfaceEndpointClient* client = endpoint->client(); + base::Optional<mojo::DisconnectReason> reason( + endpoint->disconnect_reason()); base::AutoUnlock unlocker(lock_); - client->NotifyError(); + client->NotifyError(reason); } else { endpoint->task_runner()->PostTask( FROM_HERE, @@ -582,21 +679,29 @@ class ChannelAssociatedGroupController lock_.AssertAcquired(); DCHECK(!inserted || !*inserted); - auto iter = endpoints_.find(id); - if (iter != endpoints_.end()) - return iter->second.get(); - - Endpoint* endpoint = new Endpoint(this, id); - endpoints_.insert({ id, endpoint }); - if (inserted) - *inserted = true; + Endpoint* endpoint = FindEndpoint(id); + if (!endpoint) { + endpoint = new Endpoint(this, id); + endpoints_.insert({id, endpoint}); + if (inserted) + *inserted = true; + } return endpoint; } + Endpoint* FindEndpoint(mojo::InterfaceId id) { + lock_.AssertAcquired(); + auto iter = endpoints_.find(id); + return iter != endpoints_.end() ? iter->second.get() : nullptr; + } + // mojo::MessageReceiver: bool Accept(mojo::Message* message) override { DCHECK(thread_checker_.CalledOnValidThread()); + if (!message->DeserializeAssociatedEndpointHandles(this)) + return false; + if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) return control_message_handler_.Accept(message); @@ -604,9 +709,11 @@ class ChannelAssociatedGroupController DCHECK(mojo::IsValidInterfaceId(id)); base::AutoLock locker(lock_); - Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */); - mojo::InterfaceEndpointClient* client = - endpoint ? endpoint->client() : nullptr; + Endpoint* endpoint = FindEndpoint(id); + if (!endpoint) + return true; + + mojo::InterfaceEndpointClient* client = endpoint->client(); if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { // No client has been bound yet or the client runs tasks on another // thread. We assume the other thread must always be the one on which @@ -617,12 +724,14 @@ class ChannelAssociatedGroupController DCHECK(proxy_task_runner_); if (message->has_flag(mojo::Message::kFlagIsSync)) { + MessageWrapper message_wrapper(this, std::move(*message)); // Sync messages may need to be handled by the endpoint if it's blocking // on a sync reply. We pass ownership of the message to the endpoint's // sync message queue. If the endpoint was blocking, it will dequeue the // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| // call will dequeue the message and dispatch it. - uint32_t message_id = endpoint->EnqueueSyncMessage(std::move(*message)); + uint32_t message_id = + endpoint->EnqueueSyncMessage(std::move(message_wrapper)); proxy_task_runner_->PostTask( FROM_HERE, base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, @@ -653,7 +762,7 @@ class ChannelAssociatedGroupController DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); base::AutoLock locker(lock_); - Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */); + Endpoint* endpoint = FindEndpoint(id); if (!endpoint) return; @@ -680,17 +789,16 @@ class ChannelAssociatedGroupController DCHECK(proxy_task_runner_->BelongsToCurrentThread()); base::AutoLock locker(lock_); - Endpoint* endpoint = - GetEndpointForDispatch(interface_id, false /* create */); + Endpoint* endpoint = FindEndpoint(interface_id); if (!endpoint) return; DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); - mojo::Message message = endpoint->PopSyncMessage(message_id); + MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); // The message must have already been dequeued by the endpoint waking up // from a sync wait. Nothing to do. - if (message.IsNull()) + if (message_wrapper.value().IsNull()) return; mojo::InterfaceEndpointClient* client = endpoint->client(); @@ -700,36 +808,26 @@ class ChannelAssociatedGroupController bool result = false; { base::AutoUnlock unlocker(lock_); - result = client->HandleIncomingMessage(&message); + result = client->HandleIncomingMessage(&message_wrapper.value()); } if (!result) RaiseError(); } - Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool create) { - lock_.AssertAcquired(); - auto iter = endpoints_.find(id); - if (iter != endpoints_.end()) - return iter->second.get(); - if (!create) - return nullptr; - bool inserted = false; - Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); - DCHECK(inserted); - return endpoint; - } - // mojo::PipeControlMessageHandlerDelegate: - bool OnPeerAssociatedEndpointClosed(mojo::InterfaceId id) override { + bool OnPeerAssociatedEndpointClosed( + mojo::InterfaceId id, + const base::Optional<mojo::DisconnectReason>& reason) override { DCHECK(thread_checker_.CalledOnValidThread()); - if (mojo::IsMasterInterfaceId(id)) - return false; + DCHECK(!mojo::IsMasterInterfaceId(id) || reason); scoped_refptr<ChannelAssociatedGroupController> keepalive(this); base::AutoLock locker(lock_); scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr); + if (reason) + endpoint->set_disconnect_reason(reason); if (!endpoint->peer_closed()) { if (endpoint->client()) NotifyEndpointOfError(endpoint.get(), false /* force_async */); @@ -739,23 +837,6 @@ class ChannelAssociatedGroupController return true; } - bool OnAssociatedEndpointClosedBeforeSent(mojo::InterfaceId id) override { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (mojo::IsMasterInterfaceId(id)) - return false; - - { - base::AutoLock locker(lock_); - Endpoint* endpoint = FindOrInsertEndpoint(id, nullptr); - DCHECK(!endpoint->closed()); - MarkClosedAndMaybeRemove(endpoint); - } - - control_message_proxy_.NotifyPeerEndpointClosed(id); - return true; - } - // Checked in places which must be run on the master endpoint's thread. base::ThreadChecker thread_checker_; @@ -793,28 +874,20 @@ class MojoBootstrapImpl : public MojoBootstrap { public: MojoBootstrapImpl( mojo::ScopedMessagePipeHandle handle, - Delegate* delegate, const scoped_refptr<ChannelAssociatedGroupController> controller) - : controller_(controller), - handle_(std::move(handle)), - delegate_(delegate) { - associated_group_ = controller_->CreateAssociatedGroup(); - } + : controller_(controller), + associated_group_(controller), + handle_(std::move(handle)) {} ~MojoBootstrapImpl() override { controller_->ShutDown(); } private: - // MojoBootstrap: - void Connect() override { + void Connect(mojom::ChannelAssociatedPtr* sender, + mojom::ChannelAssociatedRequest* receiver) override { controller_->Bind(std::move(handle_)); - - IPC::mojom::ChannelAssociatedPtr sender; - IPC::mojom::ChannelAssociatedRequest receiver; - controller_->CreateChannelEndpoints(&sender, &receiver); - - delegate_->OnPipesAvailable(std::move(sender), std::move(receiver)); + controller_->CreateChannelEndpoints(sender, receiver); } void Pause() override { @@ -830,14 +903,13 @@ class MojoBootstrapImpl : public MojoBootstrap { } mojo::AssociatedGroup* GetAssociatedGroup() override { - return associated_group_.get(); + return &associated_group_; } scoped_refptr<ChannelAssociatedGroupController> controller_; + mojo::AssociatedGroup associated_group_; mojo::ScopedMessagePipeHandle handle_; - Delegate* delegate_; - std::unique_ptr<mojo::AssociatedGroup> associated_group_; DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl); }; @@ -848,12 +920,10 @@ class MojoBootstrapImpl : public MojoBootstrap { std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( mojo::ScopedMessagePipeHandle handle, Channel::Mode mode, - Delegate* delegate, const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { return base::MakeUnique<MojoBootstrapImpl>( - std::move(handle), delegate, - new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, - ipc_task_runner)); + std::move(handle), new ChannelAssociatedGroupController( + mode == Channel::MODE_SERVER, ipc_task_runner)); } } // namespace IPC diff --git a/chromium/ipc/ipc_mojo_bootstrap.h b/chromium/ipc/ipc_mojo_bootstrap.h index 5188cd01996..cb4bcdc0b5e 100644 --- a/chromium/ipc/ipc_mojo_bootstrap.h +++ b/chromium/ipc/ipc_mojo_bootstrap.h @@ -31,14 +31,6 @@ namespace IPC { // UI thread as Channel::Create() can be. class IPC_EXPORT MojoBootstrap { public: - class Delegate { - public: - virtual ~Delegate() {} - - virtual void OnPipesAvailable(mojom::ChannelAssociatedPtr sender, - mojom::ChannelAssociatedRequest receiver) = 0; - }; - virtual ~MojoBootstrap() {} // Create the MojoBootstrap instance, using |handle| as the message pipe, in @@ -46,11 +38,11 @@ class IPC_EXPORT MojoBootstrap { static std::unique_ptr<MojoBootstrap> Create( mojo::ScopedMessagePipeHandle handle, Channel::Mode mode, - Delegate* delegate, const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner); // Start the handshake over the underlying message pipe. - virtual void Connect() = 0; + virtual void Connect(mojom::ChannelAssociatedPtr* sender, + mojom::ChannelAssociatedRequest* receiver) = 0; // Stop transmitting messages and start queueing them instead. virtual void Pause() = 0; diff --git a/chromium/ipc/ipc_mojo_bootstrap_unittest.cc b/chromium/ipc/ipc_mojo_bootstrap_unittest.cc index b036faca575..c6cde3ec636 100644 --- a/chromium/ipc/ipc_mojo_bootstrap_unittest.cc +++ b/chromium/ipc/ipc_mojo_bootstrap_unittest.cc @@ -18,6 +18,7 @@ #include "mojo/edk/embedder/embedder.h" #include "mojo/edk/test/mojo_test_base.h" #include "mojo/edk/test/multiprocess_test_helper.h" +#include "mojo/public/cpp/bindings/associated_binding.h" #if defined(OS_POSIX) #include "base/file_descriptor_posix.h" @@ -25,47 +26,62 @@ namespace { -class IPCMojoBootstrapTest : public testing::Test { - protected: - mojo::edk::test::MultiprocessTestHelper helper_; -}; +constexpr int32_t kTestServerPid = 42; +constexpr int32_t kTestClientPid = 4242; -class TestingDelegate : public IPC::MojoBootstrap::Delegate { +class PeerPidReceiver : public IPC::mojom::Channel { public: - explicit TestingDelegate(const base::Closure& quit_callback) - : passed_(false), quit_callback_(quit_callback) {} + PeerPidReceiver(IPC::mojom::ChannelAssociatedRequest request, + const base::Closure& on_peer_pid_set) + : binding_(this, std::move(request)), on_peer_pid_set_(on_peer_pid_set) {} + ~PeerPidReceiver() override {} + + // mojom::Channel: + void SetPeerPid(int32_t pid) override { + peer_pid_ = pid; + on_peer_pid_set_.Run(); + } - void OnPipesAvailable( - IPC::mojom::ChannelAssociatedPtr sender, - IPC::mojom::ChannelAssociatedRequest receiver) override; + void Receive(const std::vector<uint8_t>& data, + base::Optional<std::vector<IPC::mojom::SerializedHandlePtr>> + handles) override {} - bool passed() const { return passed_; } + void GetAssociatedInterface( + const std::string& name, + IPC::mojom::GenericInterfaceAssociatedRequest request) override {} + + int32_t peer_pid() const { return peer_pid_; } private: - bool passed_; - const base::Closure quit_callback_; + mojo::AssociatedBinding<IPC::mojom::Channel> binding_; + const base::Closure on_peer_pid_set_; + int32_t peer_pid_ = -1; + + DISALLOW_COPY_AND_ASSIGN(PeerPidReceiver); }; -void TestingDelegate::OnPipesAvailable( - IPC::mojom::ChannelAssociatedPtr sender, - IPC::mojom::ChannelAssociatedRequest receiver) { - passed_ = true; - quit_callback_.Run(); -} +class IPCMojoBootstrapTest : public testing::Test { + protected: + mojo::edk::test::MultiprocessTestHelper helper_; +}; TEST_F(IPCMojoBootstrapTest, Connect) { base::MessageLoop message_loop; - base::RunLoop run_loop; - TestingDelegate delegate(run_loop.QuitClosure()); std::unique_ptr<IPC::MojoBootstrap> bootstrap = IPC::MojoBootstrap::Create( helper_.StartChild("IPCMojoBootstrapTestClient"), - IPC::Channel::MODE_SERVER, &delegate, - base::ThreadTaskRunnerHandle::Get()); + IPC::Channel::MODE_SERVER, base::ThreadTaskRunnerHandle::Get()); - bootstrap->Connect(); + IPC::mojom::ChannelAssociatedPtr sender; + IPC::mojom::ChannelAssociatedRequest receiver; + bootstrap->Connect(&sender, &receiver); + sender->SetPeerPid(kTestServerPid); + + base::RunLoop run_loop; + PeerPidReceiver impl(std::move(receiver), run_loop.QuitClosure()); run_loop.Run(); - EXPECT_TRUE(delegate.passed()); + EXPECT_EQ(kTestClientPid, impl.peer_pid()); + EXPECT_TRUE(helper_.WaitForChildTestShutdown()); } @@ -74,18 +90,22 @@ MULTIPROCESS_TEST_MAIN_WITH_SETUP( IPCMojoBootstrapTestClientTestChildMain, ::mojo::edk::test::MultiprocessTestHelper::ChildSetup) { base::MessageLoop message_loop; - base::RunLoop run_loop; - TestingDelegate delegate(run_loop.QuitClosure()); std::unique_ptr<IPC::MojoBootstrap> bootstrap = IPC::MojoBootstrap::Create( std::move(mojo::edk::test::MultiprocessTestHelper::primordial_pipe), - IPC::Channel::MODE_CLIENT, &delegate, - base::ThreadTaskRunnerHandle::Get()); + IPC::Channel::MODE_CLIENT, base::ThreadTaskRunnerHandle::Get()); - bootstrap->Connect(); + IPC::mojom::ChannelAssociatedPtr sender; + IPC::mojom::ChannelAssociatedRequest receiver; + bootstrap->Connect(&sender, &receiver); + sender->SetPeerPid(kTestClientPid); + base::RunLoop run_loop; + PeerPidReceiver impl(std::move(receiver), run_loop.QuitClosure()); run_loop.Run(); - return delegate.passed() ? 0 : 1; + EXPECT_EQ(kTestServerPid, impl.peer_pid()); + + return 0; } } // namespace diff --git a/chromium/ipc/ipc_sync_channel.cc b/chromium/ipc/ipc_sync_channel.cc index 979c6d3b088..f92d1edd772 100644 --- a/chromium/ipc/ipc_sync_channel.cc +++ b/chromium/ipc/ipc_sync_channel.cc @@ -525,7 +525,8 @@ SyncChannel::SyncChannel( const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, WaitableEvent* shutdown_event) : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), - sync_handle_registry_(mojo::SyncHandleRegistry::current()) { + sync_handle_registry_(mojo::SyncHandleRegistry::current()), + dispatch_watcher_(FROM_HERE) { // The current (listener) thread must be distinct from the IPC thread, or else // sending synchronous messages will deadlock. DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); @@ -641,7 +642,7 @@ void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, } void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { - mojo::Watcher send_done_watcher; + mojo::Watcher send_done_watcher(FROM_HERE); ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); DCHECK_NE(sync_msg_queue, nullptr); diff --git a/chromium/ipc/ipc_sync_message_filter.cc b/chromium/ipc/ipc_sync_message_filter.cc index 77c6a3a0c77..b9737fd2937 100644 --- a/chromium/ipc/ipc_sync_message_filter.cc +++ b/chromium/ipc/ipc_sync_message_filter.cc @@ -172,10 +172,6 @@ void SyncMessageFilter::OnFilterAdded(Channel* channel) { { base::AutoLock auto_lock(lock_); channel_ = channel; - Channel::AssociatedInterfaceSupport* support = - channel_->GetAssociatedInterfaceSupport(); - if (support) - channel_associated_group_ = *support->GetAssociatedGroup(); io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); shutdown_watcher_.StartWatching( @@ -280,8 +276,13 @@ void SyncMessageFilter::GetGenericRemoteAssociatedInterface( mojo::ScopedInterfaceEndpointHandle handle) { base::AutoLock auto_lock(lock_); DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); - if (!channel_) + if (!channel_) { + // Attach the associated interface to a disconnected pipe, so that the + // associated interface pointer can be used to make calls (which are + // dropped). + mojo::GetIsolatedInterface(std::move(handle)); return; + } Channel::AssociatedInterfaceSupport* support = channel_->GetAssociatedInterfaceSupport(); diff --git a/chromium/ipc/ipc_sync_message_filter.h b/chromium/ipc/ipc_sync_message_filter.h index f38fd4961ba..abee662db3c 100644 --- a/chromium/ipc/ipc_sync_message_filter.h +++ b/chromium/ipc/ipc_sync_message_filter.h @@ -17,7 +17,6 @@ #include "ipc/ipc_sync_message.h" #include "ipc/message_filter.h" #include "ipc/mojo_event.h" -#include "mojo/public/cpp/bindings/associated_group.h" #include "mojo/public/cpp/bindings/associated_interface_ptr.h" #include "mojo/public/cpp/bindings/associated_interface_request.h" #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" @@ -55,8 +54,7 @@ class IPC_EXPORT SyncMessageFilter : public MessageFilter, public Sender { template <typename Interface> void GetRemoteAssociatedInterface( mojo::AssociatedInterfacePtr<Interface>* proxy) { - mojo::AssociatedInterfaceRequest<Interface> request = - mojo::MakeRequest(proxy, &channel_associated_group_); + auto request = mojo::MakeRequest(proxy); GetGenericRemoteAssociatedInterface(Interface::Name_, request.PassHandle()); } @@ -112,10 +110,6 @@ class IPC_EXPORT SyncMessageFilter : public MessageFilter, public Sender { scoped_refptr<IOMessageLoopObserver> io_message_loop_observer_; - // The AssociatedGroup for the underlying channel, used to construct new - // associated interface endpoints. - mojo::AssociatedGroup channel_associated_group_; - base::WeakPtrFactory<SyncMessageFilter> weak_factory_; DISALLOW_COPY_AND_ASSIGN(SyncMessageFilter); diff --git a/chromium/ipc/ipc_test.mojom b/chromium/ipc/ipc_test.mojom index a4edfc93a56..8af397a3a42 100644 --- a/chromium/ipc/ipc_test.mojom +++ b/chromium/ipc/ipc_test.mojom @@ -33,3 +33,7 @@ interface Reflector { Ping(string value) => (string value); Quit(); }; + +interface AssociatedInterfaceVendor { + GetTestInterface(associated SimpleTestDriver& interface_reqest); +}; |