diff options
Diffstat (limited to 'examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp')
-rw-r--r-- | examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp | 67 |
1 files changed, 49 insertions, 18 deletions
diff --git a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp index 3bd0b2fc7..3193840d1 100644 --- a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp +++ b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp @@ -11,6 +11,9 @@ #include <stdio.h> +#include <mutex> +#include <thread> + #include <grpc++/grpc++.h> #include <grpc/support/log.h> #include "../../../Utils/b3Clock.h" @@ -44,6 +47,9 @@ public: if (server_) { server_->Shutdown(); + m_requestThreadCancelled = true; + m_requestThread->join(); + delete m_requestThread; // Always shutdown the completion queue after the server. cq_->Shutdown(); server_ = 0; @@ -64,6 +70,10 @@ public: server_ = m_builder.BuildAndStart(); std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl; + //Start the thread to gather the requests. + m_requestThreadCancelled = false; + m_requestThread = new std::thread(&ServerImpl::GatherRequests, this); + // Proceed to the server's main loop. InitRpcs(comProc); } @@ -72,25 +82,13 @@ public: bool HandleSingleRpc() { CallData::CallStatus status = CallData::CallStatus::CREATE; + std::lock_guard<std::mutex> guard(m_queueMutex); + if (!m_requestQueue.empty()) { + void* tag = m_requestQueue.front(); + m_requestQueue.pop_front(); + status = static_cast<CallData*>(tag)->Proceed(); + } - { - void* tag; // uniquely identifies a request. - bool ok; - - // Block waiting to read the next event from the completion queue. The - // event is uniquely identified by its tag, which in this case is the - // memory address of a CallData instance. - // The return value of Next should always be checked. This return value - // tells us whether there is any kind of event or cq_ is shutting down. - - grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC)); - if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT) - { - //GPR_ASSERT(cq_->Next(&tag, &ok)); - GPR_ASSERT(ok); - status = static_cast<CallData*>(tag)->Proceed(); - } - } return status == CallData::CallStatus::TERMINATE; } @@ -252,6 +250,39 @@ private: std::unique_ptr<ServerCompletionQueue> cq_; PyBulletAPI::AsyncService service_; std::unique_ptr<Server> server_; + + // Mutex to protect access to the request queue variables (m_requestQueue, + // m_requestThread, m_requestThreadCancelled). + std::mutex m_queueMutex; + + // List of outstanding request tags. + std::list<void*> m_requestQueue; + + // Whether or not the gathering thread is cancelled. + bool m_requestThreadCancelled; + + // Thread to gather requests from the completion queue. + std::thread* m_requestThread; + + void GatherRequests() { + void* tag; // uniquely identifies a request. + bool ok; + + while(!m_requestThreadCancelled) { + // Block waiting to read the next event from the completion queue. The + // event is uniquely identified by its tag, which in this case is the + // memory address of a CallData instance. + // The return value of Next should always be checked. This return value + // tells us whether there is any kind of event or cq_ is shutting down. + grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC)); + if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT) + { + GPR_ASSERT(ok); + std::lock_guard<std::mutex> guard(m_queueMutex); + m_requestQueue.push_back(tag); + } + } + } }; struct grpcMyClass |