summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorerwincoumans <erwincoumans@google.com>2019-05-07 12:33:39 -0700
committerGitHub <noreply@github.com>2019-05-07 12:33:39 -0700
commita5fe998f1368c9219579ce25682cb33fecabb7a9 (patch)
tree80e9a25b55a8378b5fa396ef1a2eff36e1b92b82
parent111d06abd0829577d91c1e715f2689ab5d6404c4 (diff)
parent1d2a1c093d406a0f2ba61d66824829dfbddaa607 (diff)
downloadbullet3-a5fe998f1368c9219579ce25682cb33fecabb7a9.tar.gz
Merge pull request #2243 from crewmatt/master
Enhances gRPC Plugin
-rw-r--r--examples/SharedMemory/PhysicsServerCommandProcessor.cpp53
-rw-r--r--examples/SharedMemory/grpc/ConvertGRPCBullet.cpp21
-rw-r--r--examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp67
3 files changed, 97 insertions, 44 deletions
diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp
index 9164c5bd4..6a3fa1c29 100644
--- a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp
+++ b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp
@@ -2843,7 +2843,7 @@ void PhysicsServerCommandProcessor::deleteDynamicsWorld()
bool PhysicsServerCommandProcessor::supportsJointMotor(btMultiBody* mb, int mbLinkIndex)
{
- bool canHaveMotor = (mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::eRevolute
+ bool canHaveMotor = (mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::eRevolute
|| mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::ePrismatic);
return canHaveMotor;
}
@@ -3056,7 +3056,7 @@ bool PhysicsServerCommandProcessor::processImportedObjects(const char* fileName,
std::string* baseName = new std::string(u2b.getLinkName(u2b.getRootLinkIndex()));
m_data->m_strings.push_back(baseName);
mb->setBaseName(baseName->c_str());
-
+
#if 0
btAlignedObjectArray<char> urdf;
mb->dumpUrdf(urdf);
@@ -3067,7 +3067,7 @@ bool PhysicsServerCommandProcessor::processImportedObjects(const char* fileName,
fclose(f);
}
#endif
-
+
}
else
{
@@ -6315,7 +6315,7 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc
//we store the state details in the shared memory block, to reduce status size
SendActualStateSharedMemoryStorage* stateDetails = (SendActualStateSharedMemoryStorage*)bufferServerToClient;
-
+
//make sure the storage fits, otherwise
btAssert(sizeof(SendActualStateSharedMemoryStorage) < bufferSizeInBytes);
if (sizeof(SendActualStateSharedMemoryStorage) > bufferSizeInBytes)
@@ -6327,11 +6327,12 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc
{
btMultiBody* mb = body->m_multiBody;
SharedMemoryStatus& serverCmd = serverStatusOut;
-
+
serverStatusOut.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED;
serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId;
serverCmd.m_sendActualStateArgs.m_numLinks = body->m_multiBody->getNumLinks();
+ serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage);
serverCmd.m_sendActualStateArgs.m_stateDetails = 0;
int totalDegreeOfFreedomQ = 0;
int totalDegreeOfFreedomU = 0;
@@ -6540,9 +6541,10 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc
btRigidBody* rb = body->m_rigidBody;
SharedMemoryStatus& serverCmd = serverStatusOut;
serverCmd.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED;
-
+
serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId;
serverCmd.m_sendActualStateArgs.m_numLinks = 0;
+ serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage);
serverCmd.m_sendActualStateArgs.m_stateDetails = 0;
int totalDegreeOfFreedomQ = 0;
@@ -6584,6 +6586,7 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc
serverCmd.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED;
serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId;
serverCmd.m_sendActualStateArgs.m_numLinks = 0;
+ serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage);
serverCmd.m_sendActualStateArgs.m_stateDetails = 0;
serverCmd.m_sendActualStateArgs.m_rootLocalInertialFrame[0] =
@@ -7197,7 +7200,7 @@ bool PhysicsServerCommandProcessor::processCreateMultiBodyCommandSingle(const st
m_data->m_sdfRecentLoadedBodies.clear();
if (bodyUniqueId >= 0)
{
-
+
serverStatusOut.m_type = CMD_CREATE_MULTI_BODY_COMPLETED;
if (bufferSizeInBytes>0 && serverStatusOut.m_numDataStreamBytes==0)
{
@@ -7205,7 +7208,7 @@ bool PhysicsServerCommandProcessor::processCreateMultiBodyCommandSingle(const st
BT_PROFILE("autogenerateGraphicsObjects");
m_data->m_guiHelper->autogenerateGraphicsObjects(this->m_data->m_dynamicsWorld);
}
-
+
BT_PROFILE("createBodyInfoStream");
int streamSizeInBytes = createBodyInfoStream(bodyUniqueId, bufferServerToClient, bufferSizeInBytes);
serverStatusOut.m_numDataStreamBytes = streamSizeInBytes;
@@ -7667,7 +7670,7 @@ bool PhysicsServerCommandProcessor::processForwardDynamicsCommand(const struct S
}
btScalar deltaTimeScaled = m_data->m_physicsDeltaTime * simTimeScalingFactor;
-
+
int numSteps = 0;
if (m_data->m_numSimulationSubSteps > 0)
{
@@ -7694,7 +7697,7 @@ bool PhysicsServerCommandProcessor::processForwardDynamicsCommand(const struct S
m_data->m_dynamicsWorld->getAnalyticsData(islandAnalyticsData);
serverCmd.m_forwardDynamicsAnalyticsArgs.m_numIslands = islandAnalyticsData.size();
int numIslands = btMin(islandAnalyticsData.size(), MAX_ISLANDS_ANALYTICS);
-
+
for (int i=0;i<numIslands;i++)
{
serverCmd.m_forwardDynamicsAnalyticsArgs.m_numSolverCalls = islandAnalyticsData[i].m_numSolverCalls;
@@ -7786,7 +7789,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc
{
mb->setCanWakeup(false);
}
-
+
}
if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_LINEAR_DAMPING)
@@ -7892,7 +7895,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc
{
mb->setMaxCoordinateVelocity(clientCmd.m_changeDynamicsInfoArgs.m_maxJointVelocity);
}
-
+
}
else
{
@@ -7980,7 +7983,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc
btRigidBody* childRb = &body->m_rigidBodyJoints[linkIndex]->getRigidBodyB();
rb = childRb;
}
-
+
}
}
@@ -8071,13 +8074,13 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc
{
rb->setAnisotropicFriction(anisotropicFriction);
}
-
+
if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_CONTACT_PROCESSING_THRESHOLD)
{
rb->setContactProcessingThreshold(clientCmd.m_changeDynamicsInfoArgs.m_contactProcessingThreshold);
}
-
+
if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_CCD_SWEPT_SPHERE_RADIUS)
{
@@ -8498,7 +8501,7 @@ bool PhysicsServerCommandProcessor::processSendPhysicsParametersCommand(const st
{
m_data->m_dynamicsWorld->getSolverInfo().m_reportSolverAnalytics = clientCmd.m_physSimParamArgs.m_reportSolverAnalytics;
}
-
+
SharedMemoryStatus& serverCmd = serverStatusOut;
serverCmd.m_type = CMD_CLIENT_COMMAND_COMPLETED;
return hasStatus;
@@ -8592,7 +8595,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe
{
int posVarCount = mb->getLink(i).m_posVarCount;
bool hasPosVar = posVarCount > 0;
-
+
for (int j = 0; j < posVarCount; j++)
{
if (clientCmd.m_initPoseArgs.m_hasInitialStateQ[posVarCountIndex+j] == 0)
@@ -8621,7 +8624,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe
mb->setJointVelMultiDof(i, vel);
}
}
-
+
bool hasVel = true;
for (int j = 0; j < mb->getLink(i).m_dofCount; j++)
{
@@ -8644,7 +8647,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe
mb->setJointVelMultiDof(i, &clientCmd.m_initPoseArgs.m_initialStateQdot[uDofIndex]);
}
}
-
+
posVarCountIndex += mb->getLink(i).m_posVarCount;
uDofIndex += mb->getLink(i).m_dofCount;
}
@@ -9015,7 +9018,7 @@ bool PhysicsServerCommandProcessor::processInverseDynamicsCommand(const struct S
{
#ifdef STATIC_LINK_SPD_PLUGIN
{
- cRBDModel* rbdModel = m_data->findOrCreateRBDModel(bodyHandle->m_multiBody,
+ cRBDModel* rbdModel = m_data->findOrCreateRBDModel(bodyHandle->m_multiBody,
clientCmd.m_calculateInverseDynamicsArguments.m_jointPositionsQ,
clientCmd.m_calculateInverseDynamicsArguments.m_jointVelocitiesQdot);
if (rbdModel)
@@ -9032,9 +9035,9 @@ bool PhysicsServerCommandProcessor::processInverseDynamicsCommand(const struct S
}
serverCmd.m_inverseDynamicsResultArgs.m_bodyUniqueId = clientCmd.m_calculateInverseDynamicsArguments.m_bodyUniqueId;
serverCmd.m_inverseDynamicsResultArgs.m_dofCount = dof;
-
+
serverCmd.m_type = CMD_CALCULATED_INVERSE_DYNAMICS_COMPLETED;
-
+
}
}
#endif
@@ -9506,7 +9509,7 @@ bool PhysicsServerCommandProcessor::processRemoveBodyCommand(const struct Shared
int numCollisionObjects = m_data->m_dynamicsWorld->getNumCollisionObjects();
m_data->m_dynamicsWorld->removeMultiBody(bodyHandle->m_multiBody);
numCollisionObjects = m_data->m_dynamicsWorld->getNumCollisionObjects();
-
+
delete bodyHandle->m_multiBody;
bodyHandle->m_multiBody = 0;
serverCmd.m_type = CMD_REMOVE_BODY_COMPLETED;
@@ -10933,7 +10936,7 @@ bool PhysicsServerCommandProcessor::processRemoveStateCommand(const struct Share
bool hasStatus = true;
SharedMemoryStatus& serverCmd = serverStatusOut;
serverCmd.m_type = CMD_REMOVE_STATE_FAILED;
-
+
if (clientCmd.m_loadStateArguments.m_stateId >= 0)
{
if (clientCmd.m_loadStateArguments.m_stateId < m_data->m_savedStates.size())
@@ -11273,7 +11276,7 @@ bool PhysicsServerCommandProcessor::processCommand(const struct SharedMemoryComm
hasStatus = processCreateVisualShapeCommand(clientCmd, serverStatusOut, bufferServerToClient, bufferSizeInBytes);
break;
}
-
+
case CMD_CREATE_MULTI_BODY:
{
hasStatus = processCreateMultiBodyCommand(clientCmd, serverStatusOut, bufferServerToClient, bufferSizeInBytes);
diff --git a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp
index f23ce858f..2596a3465 100644
--- a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp
+++ b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp
@@ -1413,6 +1413,22 @@ bool convertGRPCToStatus(const PyBulletStatus& grpcReply, SharedMemoryStatus& se
break;
}
+ case CMD_SDF_LOADING_COMPLETED:
+ {
+ converted = true;
+ const ::pybullet_grpc::SdfLoadedStatus* stat = &grpcReply.sdfstatus();
+ int numBodies = stat->bodyuniqueids_size();
+ if (numBodies > MAX_SDF_BODIES)
+ {
+ printf("SDF exceeds body capacity: %d > %d", numBodies, MAX_SDF_BODIES);
+ }
+ serverStatus.m_sdfLoadedArgs.m_numBodies = numBodies;
+ for (int i = 0; i < numBodies; i++)
+ {
+ serverStatus.m_sdfLoadedArgs.m_bodyUniqueIds[i] = stat->bodyuniqueids(i);
+ }
+ break;
+ }
case CMD_DESIRED_STATE_RECEIVED_COMPLETED:
{
converted = true;
@@ -1681,7 +1697,10 @@ bool convertStatusToGRPC(const SharedMemoryStatus& serverStatus, char* bufferSer
case CMD_ACTUAL_STATE_UPDATE_COMPLETED:
{
converted = true;
- b3SharedMemoryStatusHandle statusHandle = (b3SharedMemoryStatusHandle)&serverStatus;
+ SharedMemoryStatus* status = (SharedMemoryStatus*)&serverStatus;
+ status->m_sendActualStateArgs.m_stateDetails = (SendActualStateSharedMemoryStorage*)bufferServerToClient;
+ b3SharedMemoryStatusHandle statusHandle = (b3SharedMemoryStatusHandle)status;
+
int bodyUniqueId;
int numLinks;
diff --git a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
index 3bd0b2fc7..3193840d1 100644
--- a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
+++ b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
@@ -11,6 +11,9 @@
#include <stdio.h>
+#include <mutex>
+#include <thread>
+
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include "../../../Utils/b3Clock.h"
@@ -44,6 +47,9 @@ public:
if (server_)
{
server_->Shutdown();
+ m_requestThreadCancelled = true;
+ m_requestThread->join();
+ delete m_requestThread;
// Always shutdown the completion queue after the server.
cq_->Shutdown();
server_ = 0;
@@ -64,6 +70,10 @@ public:
server_ = m_builder.BuildAndStart();
std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl;
+ //Start the thread to gather the requests.
+ m_requestThreadCancelled = false;
+ m_requestThread = new std::thread(&ServerImpl::GatherRequests, this);
+
// Proceed to the server's main loop.
InitRpcs(comProc);
}
@@ -72,25 +82,13 @@ public:
bool HandleSingleRpc()
{
CallData::CallStatus status = CallData::CallStatus::CREATE;
+ std::lock_guard<std::mutex> guard(m_queueMutex);
+ if (!m_requestQueue.empty()) {
+ void* tag = m_requestQueue.front();
+ m_requestQueue.pop_front();
+ status = static_cast<CallData*>(tag)->Proceed();
+ }
- {
- void* tag; // uniquely identifies a request.
- bool ok;
-
- // Block waiting to read the next event from the completion queue. The
- // event is uniquely identified by its tag, which in this case is the
- // memory address of a CallData instance.
- // The return value of Next should always be checked. This return value
- // tells us whether there is any kind of event or cq_ is shutting down.
-
- grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
- if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
- {
- //GPR_ASSERT(cq_->Next(&tag, &ok));
- GPR_ASSERT(ok);
- status = static_cast<CallData*>(tag)->Proceed();
- }
- }
return status == CallData::CallStatus::TERMINATE;
}
@@ -252,6 +250,39 @@ private:
std::unique_ptr<ServerCompletionQueue> cq_;
PyBulletAPI::AsyncService service_;
std::unique_ptr<Server> server_;
+
+ // Mutex to protect access to the request queue variables (m_requestQueue,
+ // m_requestThread, m_requestThreadCancelled).
+ std::mutex m_queueMutex;
+
+ // List of outstanding request tags.
+ std::list<void*> m_requestQueue;
+
+ // Whether or not the gathering thread is cancelled.
+ bool m_requestThreadCancelled;
+
+ // Thread to gather requests from the completion queue.
+ std::thread* m_requestThread;
+
+ void GatherRequests() {
+ void* tag; // uniquely identifies a request.
+ bool ok;
+
+ while(!m_requestThreadCancelled) {
+ // Block waiting to read the next event from the completion queue. The
+ // event is uniquely identified by its tag, which in this case is the
+ // memory address of a CallData instance.
+ // The return value of Next should always be checked. This return value
+ // tells us whether there is any kind of event or cq_ is shutting down.
+ grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
+ if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
+ {
+ GPR_ASSERT(ok);
+ std::lock_guard<std::mutex> guard(m_queueMutex);
+ m_requestQueue.push_back(tag);
+ }
+ }
+ }
};
struct grpcMyClass