diff options
author | erwincoumans <erwincoumans@google.com> | 2019-05-07 12:33:39 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-07 12:33:39 -0700 |
commit | a5fe998f1368c9219579ce25682cb33fecabb7a9 (patch) | |
tree | 80e9a25b55a8378b5fa396ef1a2eff36e1b92b82 | |
parent | 111d06abd0829577d91c1e715f2689ab5d6404c4 (diff) | |
parent | 1d2a1c093d406a0f2ba61d66824829dfbddaa607 (diff) | |
download | bullet3-a5fe998f1368c9219579ce25682cb33fecabb7a9.tar.gz |
Merge pull request #2243 from crewmatt/master
Enhances gRPC Plugin
-rw-r--r-- | examples/SharedMemory/PhysicsServerCommandProcessor.cpp | 53 | ||||
-rw-r--r-- | examples/SharedMemory/grpc/ConvertGRPCBullet.cpp | 21 | ||||
-rw-r--r-- | examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp | 67 |
3 files changed, 97 insertions, 44 deletions
diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp index 9164c5bd4..6a3fa1c29 100644 --- a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp +++ b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp @@ -2843,7 +2843,7 @@ void PhysicsServerCommandProcessor::deleteDynamicsWorld() bool PhysicsServerCommandProcessor::supportsJointMotor(btMultiBody* mb, int mbLinkIndex) { - bool canHaveMotor = (mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::eRevolute + bool canHaveMotor = (mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::eRevolute || mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::ePrismatic); return canHaveMotor; } @@ -3056,7 +3056,7 @@ bool PhysicsServerCommandProcessor::processImportedObjects(const char* fileName, std::string* baseName = new std::string(u2b.getLinkName(u2b.getRootLinkIndex())); m_data->m_strings.push_back(baseName); mb->setBaseName(baseName->c_str()); - + #if 0 btAlignedObjectArray<char> urdf; mb->dumpUrdf(urdf); @@ -3067,7 +3067,7 @@ bool PhysicsServerCommandProcessor::processImportedObjects(const char* fileName, fclose(f); } #endif - + } else { @@ -6315,7 +6315,7 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc //we store the state details in the shared memory block, to reduce status size SendActualStateSharedMemoryStorage* stateDetails = (SendActualStateSharedMemoryStorage*)bufferServerToClient; - + //make sure the storage fits, otherwise btAssert(sizeof(SendActualStateSharedMemoryStorage) < bufferSizeInBytes); if (sizeof(SendActualStateSharedMemoryStorage) > bufferSizeInBytes) @@ -6327,11 +6327,12 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc { btMultiBody* mb = body->m_multiBody; SharedMemoryStatus& serverCmd = serverStatusOut; - + serverStatusOut.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED; serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId; serverCmd.m_sendActualStateArgs.m_numLinks = body->m_multiBody->getNumLinks(); + serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage); serverCmd.m_sendActualStateArgs.m_stateDetails = 0; int totalDegreeOfFreedomQ = 0; int totalDegreeOfFreedomU = 0; @@ -6540,9 +6541,10 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc btRigidBody* rb = body->m_rigidBody; SharedMemoryStatus& serverCmd = serverStatusOut; serverCmd.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED; - + serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId; serverCmd.m_sendActualStateArgs.m_numLinks = 0; + serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage); serverCmd.m_sendActualStateArgs.m_stateDetails = 0; int totalDegreeOfFreedomQ = 0; @@ -6584,6 +6586,7 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc serverCmd.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED; serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId; serverCmd.m_sendActualStateArgs.m_numLinks = 0; + serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage); serverCmd.m_sendActualStateArgs.m_stateDetails = 0; serverCmd.m_sendActualStateArgs.m_rootLocalInertialFrame[0] = @@ -7197,7 +7200,7 @@ bool PhysicsServerCommandProcessor::processCreateMultiBodyCommandSingle(const st m_data->m_sdfRecentLoadedBodies.clear(); if (bodyUniqueId >= 0) { - + serverStatusOut.m_type = CMD_CREATE_MULTI_BODY_COMPLETED; if (bufferSizeInBytes>0 && serverStatusOut.m_numDataStreamBytes==0) { @@ -7205,7 +7208,7 @@ bool PhysicsServerCommandProcessor::processCreateMultiBodyCommandSingle(const st BT_PROFILE("autogenerateGraphicsObjects"); m_data->m_guiHelper->autogenerateGraphicsObjects(this->m_data->m_dynamicsWorld); } - + BT_PROFILE("createBodyInfoStream"); int streamSizeInBytes = createBodyInfoStream(bodyUniqueId, bufferServerToClient, bufferSizeInBytes); serverStatusOut.m_numDataStreamBytes = streamSizeInBytes; @@ -7667,7 +7670,7 @@ bool PhysicsServerCommandProcessor::processForwardDynamicsCommand(const struct S } btScalar deltaTimeScaled = m_data->m_physicsDeltaTime * simTimeScalingFactor; - + int numSteps = 0; if (m_data->m_numSimulationSubSteps > 0) { @@ -7694,7 +7697,7 @@ bool PhysicsServerCommandProcessor::processForwardDynamicsCommand(const struct S m_data->m_dynamicsWorld->getAnalyticsData(islandAnalyticsData); serverCmd.m_forwardDynamicsAnalyticsArgs.m_numIslands = islandAnalyticsData.size(); int numIslands = btMin(islandAnalyticsData.size(), MAX_ISLANDS_ANALYTICS); - + for (int i=0;i<numIslands;i++) { serverCmd.m_forwardDynamicsAnalyticsArgs.m_numSolverCalls = islandAnalyticsData[i].m_numSolverCalls; @@ -7786,7 +7789,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc { mb->setCanWakeup(false); } - + } if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_LINEAR_DAMPING) @@ -7892,7 +7895,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc { mb->setMaxCoordinateVelocity(clientCmd.m_changeDynamicsInfoArgs.m_maxJointVelocity); } - + } else { @@ -7980,7 +7983,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc btRigidBody* childRb = &body->m_rigidBodyJoints[linkIndex]->getRigidBodyB(); rb = childRb; } - + } } @@ -8071,13 +8074,13 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc { rb->setAnisotropicFriction(anisotropicFriction); } - + if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_CONTACT_PROCESSING_THRESHOLD) { rb->setContactProcessingThreshold(clientCmd.m_changeDynamicsInfoArgs.m_contactProcessingThreshold); } - + if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_CCD_SWEPT_SPHERE_RADIUS) { @@ -8498,7 +8501,7 @@ bool PhysicsServerCommandProcessor::processSendPhysicsParametersCommand(const st { m_data->m_dynamicsWorld->getSolverInfo().m_reportSolverAnalytics = clientCmd.m_physSimParamArgs.m_reportSolverAnalytics; } - + SharedMemoryStatus& serverCmd = serverStatusOut; serverCmd.m_type = CMD_CLIENT_COMMAND_COMPLETED; return hasStatus; @@ -8592,7 +8595,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe { int posVarCount = mb->getLink(i).m_posVarCount; bool hasPosVar = posVarCount > 0; - + for (int j = 0; j < posVarCount; j++) { if (clientCmd.m_initPoseArgs.m_hasInitialStateQ[posVarCountIndex+j] == 0) @@ -8621,7 +8624,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe mb->setJointVelMultiDof(i, vel); } } - + bool hasVel = true; for (int j = 0; j < mb->getLink(i).m_dofCount; j++) { @@ -8644,7 +8647,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe mb->setJointVelMultiDof(i, &clientCmd.m_initPoseArgs.m_initialStateQdot[uDofIndex]); } } - + posVarCountIndex += mb->getLink(i).m_posVarCount; uDofIndex += mb->getLink(i).m_dofCount; } @@ -9015,7 +9018,7 @@ bool PhysicsServerCommandProcessor::processInverseDynamicsCommand(const struct S { #ifdef STATIC_LINK_SPD_PLUGIN { - cRBDModel* rbdModel = m_data->findOrCreateRBDModel(bodyHandle->m_multiBody, + cRBDModel* rbdModel = m_data->findOrCreateRBDModel(bodyHandle->m_multiBody, clientCmd.m_calculateInverseDynamicsArguments.m_jointPositionsQ, clientCmd.m_calculateInverseDynamicsArguments.m_jointVelocitiesQdot); if (rbdModel) @@ -9032,9 +9035,9 @@ bool PhysicsServerCommandProcessor::processInverseDynamicsCommand(const struct S } serverCmd.m_inverseDynamicsResultArgs.m_bodyUniqueId = clientCmd.m_calculateInverseDynamicsArguments.m_bodyUniqueId; serverCmd.m_inverseDynamicsResultArgs.m_dofCount = dof; - + serverCmd.m_type = CMD_CALCULATED_INVERSE_DYNAMICS_COMPLETED; - + } } #endif @@ -9506,7 +9509,7 @@ bool PhysicsServerCommandProcessor::processRemoveBodyCommand(const struct Shared int numCollisionObjects = m_data->m_dynamicsWorld->getNumCollisionObjects(); m_data->m_dynamicsWorld->removeMultiBody(bodyHandle->m_multiBody); numCollisionObjects = m_data->m_dynamicsWorld->getNumCollisionObjects(); - + delete bodyHandle->m_multiBody; bodyHandle->m_multiBody = 0; serverCmd.m_type = CMD_REMOVE_BODY_COMPLETED; @@ -10933,7 +10936,7 @@ bool PhysicsServerCommandProcessor::processRemoveStateCommand(const struct Share bool hasStatus = true; SharedMemoryStatus& serverCmd = serverStatusOut; serverCmd.m_type = CMD_REMOVE_STATE_FAILED; - + if (clientCmd.m_loadStateArguments.m_stateId >= 0) { if (clientCmd.m_loadStateArguments.m_stateId < m_data->m_savedStates.size()) @@ -11273,7 +11276,7 @@ bool PhysicsServerCommandProcessor::processCommand(const struct SharedMemoryComm hasStatus = processCreateVisualShapeCommand(clientCmd, serverStatusOut, bufferServerToClient, bufferSizeInBytes); break; } - + case CMD_CREATE_MULTI_BODY: { hasStatus = processCreateMultiBodyCommand(clientCmd, serverStatusOut, bufferServerToClient, bufferSizeInBytes); diff --git a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp index f23ce858f..2596a3465 100644 --- a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp +++ b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp @@ -1413,6 +1413,22 @@ bool convertGRPCToStatus(const PyBulletStatus& grpcReply, SharedMemoryStatus& se break; } + case CMD_SDF_LOADING_COMPLETED: + { + converted = true; + const ::pybullet_grpc::SdfLoadedStatus* stat = &grpcReply.sdfstatus(); + int numBodies = stat->bodyuniqueids_size(); + if (numBodies > MAX_SDF_BODIES) + { + printf("SDF exceeds body capacity: %d > %d", numBodies, MAX_SDF_BODIES); + } + serverStatus.m_sdfLoadedArgs.m_numBodies = numBodies; + for (int i = 0; i < numBodies; i++) + { + serverStatus.m_sdfLoadedArgs.m_bodyUniqueIds[i] = stat->bodyuniqueids(i); + } + break; + } case CMD_DESIRED_STATE_RECEIVED_COMPLETED: { converted = true; @@ -1681,7 +1697,10 @@ bool convertStatusToGRPC(const SharedMemoryStatus& serverStatus, char* bufferSer case CMD_ACTUAL_STATE_UPDATE_COMPLETED: { converted = true; - b3SharedMemoryStatusHandle statusHandle = (b3SharedMemoryStatusHandle)&serverStatus; + SharedMemoryStatus* status = (SharedMemoryStatus*)&serverStatus; + status->m_sendActualStateArgs.m_stateDetails = (SendActualStateSharedMemoryStorage*)bufferServerToClient; + b3SharedMemoryStatusHandle statusHandle = (b3SharedMemoryStatusHandle)status; + int bodyUniqueId; int numLinks; diff --git a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp index 3bd0b2fc7..3193840d1 100644 --- a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp +++ b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp @@ -11,6 +11,9 @@ #include <stdio.h> +#include <mutex> +#include <thread> + #include <grpc++/grpc++.h> #include <grpc/support/log.h> #include "../../../Utils/b3Clock.h" @@ -44,6 +47,9 @@ public: if (server_) { server_->Shutdown(); + m_requestThreadCancelled = true; + m_requestThread->join(); + delete m_requestThread; // Always shutdown the completion queue after the server. cq_->Shutdown(); server_ = 0; @@ -64,6 +70,10 @@ public: server_ = m_builder.BuildAndStart(); std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl; + //Start the thread to gather the requests. + m_requestThreadCancelled = false; + m_requestThread = new std::thread(&ServerImpl::GatherRequests, this); + // Proceed to the server's main loop. InitRpcs(comProc); } @@ -72,25 +82,13 @@ public: bool HandleSingleRpc() { CallData::CallStatus status = CallData::CallStatus::CREATE; + std::lock_guard<std::mutex> guard(m_queueMutex); + if (!m_requestQueue.empty()) { + void* tag = m_requestQueue.front(); + m_requestQueue.pop_front(); + status = static_cast<CallData*>(tag)->Proceed(); + } - { - void* tag; // uniquely identifies a request. - bool ok; - - // Block waiting to read the next event from the completion queue. The - // event is uniquely identified by its tag, which in this case is the - // memory address of a CallData instance. - // The return value of Next should always be checked. This return value - // tells us whether there is any kind of event or cq_ is shutting down. - - grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC)); - if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT) - { - //GPR_ASSERT(cq_->Next(&tag, &ok)); - GPR_ASSERT(ok); - status = static_cast<CallData*>(tag)->Proceed(); - } - } return status == CallData::CallStatus::TERMINATE; } @@ -252,6 +250,39 @@ private: std::unique_ptr<ServerCompletionQueue> cq_; PyBulletAPI::AsyncService service_; std::unique_ptr<Server> server_; + + // Mutex to protect access to the request queue variables (m_requestQueue, + // m_requestThread, m_requestThreadCancelled). + std::mutex m_queueMutex; + + // List of outstanding request tags. + std::list<void*> m_requestQueue; + + // Whether or not the gathering thread is cancelled. + bool m_requestThreadCancelled; + + // Thread to gather requests from the completion queue. + std::thread* m_requestThread; + + void GatherRequests() { + void* tag; // uniquely identifies a request. + bool ok; + + while(!m_requestThreadCancelled) { + // Block waiting to read the next event from the completion queue. The + // event is uniquely identified by its tag, which in this case is the + // memory address of a CallData instance. + // The return value of Next should always be checked. This return value + // tells us whether there is any kind of event or cq_ is shutting down. + grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC)); + if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT) + { + GPR_ASSERT(ok); + std::lock_guard<std::mutex> guard(m_queueMutex); + m_requestQueue.push_back(tag); + } + } + } }; struct grpcMyClass |