summaryrefslogtreecommitdiff
path: root/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp')
-rw-r--r--examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp67
1 files changed, 49 insertions, 18 deletions
diff --git a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
index 3bd0b2fc7..3193840d1 100644
--- a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
+++ b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
@@ -11,6 +11,9 @@
#include <stdio.h>
+#include <mutex>
+#include <thread>
+
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include "../../../Utils/b3Clock.h"
@@ -44,6 +47,9 @@ public:
if (server_)
{
server_->Shutdown();
+ m_requestThreadCancelled = true;
+ m_requestThread->join();
+ delete m_requestThread;
// Always shutdown the completion queue after the server.
cq_->Shutdown();
server_ = 0;
@@ -64,6 +70,10 @@ public:
server_ = m_builder.BuildAndStart();
std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl;
+ //Start the thread to gather the requests.
+ m_requestThreadCancelled = false;
+ m_requestThread = new std::thread(&ServerImpl::GatherRequests, this);
+
// Proceed to the server's main loop.
InitRpcs(comProc);
}
@@ -72,25 +82,13 @@ public:
bool HandleSingleRpc()
{
CallData::CallStatus status = CallData::CallStatus::CREATE;
+ std::lock_guard<std::mutex> guard(m_queueMutex);
+ if (!m_requestQueue.empty()) {
+ void* tag = m_requestQueue.front();
+ m_requestQueue.pop_front();
+ status = static_cast<CallData*>(tag)->Proceed();
+ }
- {
- void* tag; // uniquely identifies a request.
- bool ok;
-
- // Block waiting to read the next event from the completion queue. The
- // event is uniquely identified by its tag, which in this case is the
- // memory address of a CallData instance.
- // The return value of Next should always be checked. This return value
- // tells us whether there is any kind of event or cq_ is shutting down.
-
- grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
- if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
- {
- //GPR_ASSERT(cq_->Next(&tag, &ok));
- GPR_ASSERT(ok);
- status = static_cast<CallData*>(tag)->Proceed();
- }
- }
return status == CallData::CallStatus::TERMINATE;
}
@@ -252,6 +250,39 @@ private:
std::unique_ptr<ServerCompletionQueue> cq_;
PyBulletAPI::AsyncService service_;
std::unique_ptr<Server> server_;
+
+ // Mutex to protect access to the request queue variables (m_requestQueue,
+ // m_requestThread, m_requestThreadCancelled).
+ std::mutex m_queueMutex;
+
+ // List of outstanding request tags.
+ std::list<void*> m_requestQueue;
+
+ // Whether or not the gathering thread is cancelled.
+ bool m_requestThreadCancelled;
+
+ // Thread to gather requests from the completion queue.
+ std::thread* m_requestThread;
+
+ void GatherRequests() {
+ void* tag; // uniquely identifies a request.
+ bool ok;
+
+ while(!m_requestThreadCancelled) {
+ // Block waiting to read the next event from the completion queue. The
+ // event is uniquely identified by its tag, which in this case is the
+ // memory address of a CallData instance.
+ // The return value of Next should always be checked. This return value
+ // tells us whether there is any kind of event or cq_ is shutting down.
+ grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
+ if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
+ {
+ GPR_ASSERT(ok);
+ std::lock_guard<std::mutex> guard(m_queueMutex);
+ m_requestQueue.push_back(tag);
+ }
+ }
+ }
};
struct grpcMyClass