From cf16c5c51fc1387b131a605ff18ae98200d7644a Mon Sep 17 00:00:00 2001 From: mbennice Date: Tue, 7 May 2019 11:11:01 -0700 Subject: Reduces wait in the gRPC plugin, fix state bug. Updates the gRPC plugin to gather messages in its own thread, and corrects the state details not being correctly added to shared memory. --- .../SharedMemory/PhysicsServerCommandProcessor.cpp | 53 +++++++++-------- .../SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp | 67 ++++++++++++++++------ 2 files changed, 77 insertions(+), 43 deletions(-) diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp index 9164c5bd4..6a3fa1c29 100644 --- a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp +++ b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp @@ -2843,7 +2843,7 @@ void PhysicsServerCommandProcessor::deleteDynamicsWorld() bool PhysicsServerCommandProcessor::supportsJointMotor(btMultiBody* mb, int mbLinkIndex) { - bool canHaveMotor = (mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::eRevolute + bool canHaveMotor = (mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::eRevolute || mb->getLink(mbLinkIndex).m_jointType == btMultibodyLink::ePrismatic); return canHaveMotor; } @@ -3056,7 +3056,7 @@ bool PhysicsServerCommandProcessor::processImportedObjects(const char* fileName, std::string* baseName = new std::string(u2b.getLinkName(u2b.getRootLinkIndex())); m_data->m_strings.push_back(baseName); mb->setBaseName(baseName->c_str()); - + #if 0 btAlignedObjectArray urdf; mb->dumpUrdf(urdf); @@ -3067,7 +3067,7 @@ bool PhysicsServerCommandProcessor::processImportedObjects(const char* fileName, fclose(f); } #endif - + } else { @@ -6315,7 +6315,7 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc //we store the state details in the shared memory block, to reduce status size SendActualStateSharedMemoryStorage* stateDetails = (SendActualStateSharedMemoryStorage*)bufferServerToClient; - + //make sure the storage fits, otherwise btAssert(sizeof(SendActualStateSharedMemoryStorage) < bufferSizeInBytes); if (sizeof(SendActualStateSharedMemoryStorage) > bufferSizeInBytes) @@ -6327,11 +6327,12 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc { btMultiBody* mb = body->m_multiBody; SharedMemoryStatus& serverCmd = serverStatusOut; - + serverStatusOut.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED; serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId; serverCmd.m_sendActualStateArgs.m_numLinks = body->m_multiBody->getNumLinks(); + serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage); serverCmd.m_sendActualStateArgs.m_stateDetails = 0; int totalDegreeOfFreedomQ = 0; int totalDegreeOfFreedomU = 0; @@ -6540,9 +6541,10 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc btRigidBody* rb = body->m_rigidBody; SharedMemoryStatus& serverCmd = serverStatusOut; serverCmd.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED; - + serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId; serverCmd.m_sendActualStateArgs.m_numLinks = 0; + serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage); serverCmd.m_sendActualStateArgs.m_stateDetails = 0; int totalDegreeOfFreedomQ = 0; @@ -6584,6 +6586,7 @@ bool PhysicsServerCommandProcessor::processRequestActualStateCommand(const struc serverCmd.m_type = CMD_ACTUAL_STATE_UPDATE_COMPLETED; serverCmd.m_sendActualStateArgs.m_bodyUniqueId = bodyUniqueId; serverCmd.m_sendActualStateArgs.m_numLinks = 0; + serverCmd.m_numDataStreamBytes = sizeof(SendActualStateSharedMemoryStorage); serverCmd.m_sendActualStateArgs.m_stateDetails = 0; serverCmd.m_sendActualStateArgs.m_rootLocalInertialFrame[0] = @@ -7197,7 +7200,7 @@ bool PhysicsServerCommandProcessor::processCreateMultiBodyCommandSingle(const st m_data->m_sdfRecentLoadedBodies.clear(); if (bodyUniqueId >= 0) { - + serverStatusOut.m_type = CMD_CREATE_MULTI_BODY_COMPLETED; if (bufferSizeInBytes>0 && serverStatusOut.m_numDataStreamBytes==0) { @@ -7205,7 +7208,7 @@ bool PhysicsServerCommandProcessor::processCreateMultiBodyCommandSingle(const st BT_PROFILE("autogenerateGraphicsObjects"); m_data->m_guiHelper->autogenerateGraphicsObjects(this->m_data->m_dynamicsWorld); } - + BT_PROFILE("createBodyInfoStream"); int streamSizeInBytes = createBodyInfoStream(bodyUniqueId, bufferServerToClient, bufferSizeInBytes); serverStatusOut.m_numDataStreamBytes = streamSizeInBytes; @@ -7667,7 +7670,7 @@ bool PhysicsServerCommandProcessor::processForwardDynamicsCommand(const struct S } btScalar deltaTimeScaled = m_data->m_physicsDeltaTime * simTimeScalingFactor; - + int numSteps = 0; if (m_data->m_numSimulationSubSteps > 0) { @@ -7694,7 +7697,7 @@ bool PhysicsServerCommandProcessor::processForwardDynamicsCommand(const struct S m_data->m_dynamicsWorld->getAnalyticsData(islandAnalyticsData); serverCmd.m_forwardDynamicsAnalyticsArgs.m_numIslands = islandAnalyticsData.size(); int numIslands = btMin(islandAnalyticsData.size(), MAX_ISLANDS_ANALYTICS); - + for (int i=0;isetCanWakeup(false); } - + } if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_LINEAR_DAMPING) @@ -7892,7 +7895,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc { mb->setMaxCoordinateVelocity(clientCmd.m_changeDynamicsInfoArgs.m_maxJointVelocity); } - + } else { @@ -7980,7 +7983,7 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc btRigidBody* childRb = &body->m_rigidBodyJoints[linkIndex]->getRigidBodyB(); rb = childRb; } - + } } @@ -8071,13 +8074,13 @@ bool PhysicsServerCommandProcessor::processChangeDynamicsInfoCommand(const struc { rb->setAnisotropicFriction(anisotropicFriction); } - + if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_CONTACT_PROCESSING_THRESHOLD) { rb->setContactProcessingThreshold(clientCmd.m_changeDynamicsInfoArgs.m_contactProcessingThreshold); } - + if (clientCmd.m_updateFlags & CHANGE_DYNAMICS_INFO_SET_CCD_SWEPT_SPHERE_RADIUS) { @@ -8498,7 +8501,7 @@ bool PhysicsServerCommandProcessor::processSendPhysicsParametersCommand(const st { m_data->m_dynamicsWorld->getSolverInfo().m_reportSolverAnalytics = clientCmd.m_physSimParamArgs.m_reportSolverAnalytics; } - + SharedMemoryStatus& serverCmd = serverStatusOut; serverCmd.m_type = CMD_CLIENT_COMMAND_COMPLETED; return hasStatus; @@ -8592,7 +8595,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe { int posVarCount = mb->getLink(i).m_posVarCount; bool hasPosVar = posVarCount > 0; - + for (int j = 0; j < posVarCount; j++) { if (clientCmd.m_initPoseArgs.m_hasInitialStateQ[posVarCountIndex+j] == 0) @@ -8621,7 +8624,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe mb->setJointVelMultiDof(i, vel); } } - + bool hasVel = true; for (int j = 0; j < mb->getLink(i).m_dofCount; j++) { @@ -8644,7 +8647,7 @@ bool PhysicsServerCommandProcessor::processInitPoseCommand(const struct SharedMe mb->setJointVelMultiDof(i, &clientCmd.m_initPoseArgs.m_initialStateQdot[uDofIndex]); } } - + posVarCountIndex += mb->getLink(i).m_posVarCount; uDofIndex += mb->getLink(i).m_dofCount; } @@ -9015,7 +9018,7 @@ bool PhysicsServerCommandProcessor::processInverseDynamicsCommand(const struct S { #ifdef STATIC_LINK_SPD_PLUGIN { - cRBDModel* rbdModel = m_data->findOrCreateRBDModel(bodyHandle->m_multiBody, + cRBDModel* rbdModel = m_data->findOrCreateRBDModel(bodyHandle->m_multiBody, clientCmd.m_calculateInverseDynamicsArguments.m_jointPositionsQ, clientCmd.m_calculateInverseDynamicsArguments.m_jointVelocitiesQdot); if (rbdModel) @@ -9032,9 +9035,9 @@ bool PhysicsServerCommandProcessor::processInverseDynamicsCommand(const struct S } serverCmd.m_inverseDynamicsResultArgs.m_bodyUniqueId = clientCmd.m_calculateInverseDynamicsArguments.m_bodyUniqueId; serverCmd.m_inverseDynamicsResultArgs.m_dofCount = dof; - + serverCmd.m_type = CMD_CALCULATED_INVERSE_DYNAMICS_COMPLETED; - + } } #endif @@ -9506,7 +9509,7 @@ bool PhysicsServerCommandProcessor::processRemoveBodyCommand(const struct Shared int numCollisionObjects = m_data->m_dynamicsWorld->getNumCollisionObjects(); m_data->m_dynamicsWorld->removeMultiBody(bodyHandle->m_multiBody); numCollisionObjects = m_data->m_dynamicsWorld->getNumCollisionObjects(); - + delete bodyHandle->m_multiBody; bodyHandle->m_multiBody = 0; serverCmd.m_type = CMD_REMOVE_BODY_COMPLETED; @@ -10933,7 +10936,7 @@ bool PhysicsServerCommandProcessor::processRemoveStateCommand(const struct Share bool hasStatus = true; SharedMemoryStatus& serverCmd = serverStatusOut; serverCmd.m_type = CMD_REMOVE_STATE_FAILED; - + if (clientCmd.m_loadStateArguments.m_stateId >= 0) { if (clientCmd.m_loadStateArguments.m_stateId < m_data->m_savedStates.size()) @@ -11273,7 +11276,7 @@ bool PhysicsServerCommandProcessor::processCommand(const struct SharedMemoryComm hasStatus = processCreateVisualShapeCommand(clientCmd, serverStatusOut, bufferServerToClient, bufferSizeInBytes); break; } - + case CMD_CREATE_MULTI_BODY: { hasStatus = processCreateMultiBodyCommand(clientCmd, serverStatusOut, bufferServerToClient, bufferSizeInBytes); diff --git a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp index 3bd0b2fc7..3193840d1 100644 --- a/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp +++ b/examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp @@ -11,6 +11,9 @@ #include +#include +#include + #include #include #include "../../../Utils/b3Clock.h" @@ -44,6 +47,9 @@ public: if (server_) { server_->Shutdown(); + m_requestThreadCancelled = true; + m_requestThread->join(); + delete m_requestThread; // Always shutdown the completion queue after the server. cq_->Shutdown(); server_ = 0; @@ -64,6 +70,10 @@ public: server_ = m_builder.BuildAndStart(); std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl; + //Start the thread to gather the requests. + m_requestThreadCancelled = false; + m_requestThread = new std::thread(&ServerImpl::GatherRequests, this); + // Proceed to the server's main loop. InitRpcs(comProc); } @@ -72,25 +82,13 @@ public: bool HandleSingleRpc() { CallData::CallStatus status = CallData::CallStatus::CREATE; + std::lock_guard guard(m_queueMutex); + if (!m_requestQueue.empty()) { + void* tag = m_requestQueue.front(); + m_requestQueue.pop_front(); + status = static_cast(tag)->Proceed(); + } - { - void* tag; // uniquely identifies a request. - bool ok; - - // Block waiting to read the next event from the completion queue. The - // event is uniquely identified by its tag, which in this case is the - // memory address of a CallData instance. - // The return value of Next should always be checked. This return value - // tells us whether there is any kind of event or cq_ is shutting down. - - grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC)); - if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT) - { - //GPR_ASSERT(cq_->Next(&tag, &ok)); - GPR_ASSERT(ok); - status = static_cast(tag)->Proceed(); - } - } return status == CallData::CallStatus::TERMINATE; } @@ -252,6 +250,39 @@ private: std::unique_ptr cq_; PyBulletAPI::AsyncService service_; std::unique_ptr server_; + + // Mutex to protect access to the request queue variables (m_requestQueue, + // m_requestThread, m_requestThreadCancelled). + std::mutex m_queueMutex; + + // List of outstanding request tags. + std::list m_requestQueue; + + // Whether or not the gathering thread is cancelled. + bool m_requestThreadCancelled; + + // Thread to gather requests from the completion queue. + std::thread* m_requestThread; + + void GatherRequests() { + void* tag; // uniquely identifies a request. + bool ok; + + while(!m_requestThreadCancelled) { + // Block waiting to read the next event from the completion queue. The + // event is uniquely identified by its tag, which in this case is the + // memory address of a CallData instance. + // The return value of Next should always be checked. This return value + // tells us whether there is any kind of event or cq_ is shutting down. + grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC)); + if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT) + { + GPR_ASSERT(ok); + std::lock_guard guard(m_queueMutex); + m_requestQueue.push_back(tag); + } + } + } }; struct grpcMyClass -- cgit v1.2.1 From 1d2a1c093d406a0f2ba61d66824829dfbddaa607 Mon Sep 17 00:00:00 2001 From: mbennice Date: Tue, 7 May 2019 11:18:11 -0700 Subject: Update GRPC conversion of SDF and State Update --- examples/SharedMemory/grpc/ConvertGRPCBullet.cpp | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp index f23ce858f..2596a3465 100644 --- a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp +++ b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp @@ -1413,6 +1413,22 @@ bool convertGRPCToStatus(const PyBulletStatus& grpcReply, SharedMemoryStatus& se break; } + case CMD_SDF_LOADING_COMPLETED: + { + converted = true; + const ::pybullet_grpc::SdfLoadedStatus* stat = &grpcReply.sdfstatus(); + int numBodies = stat->bodyuniqueids_size(); + if (numBodies > MAX_SDF_BODIES) + { + printf("SDF exceeds body capacity: %d > %d", numBodies, MAX_SDF_BODIES); + } + serverStatus.m_sdfLoadedArgs.m_numBodies = numBodies; + for (int i = 0; i < numBodies; i++) + { + serverStatus.m_sdfLoadedArgs.m_bodyUniqueIds[i] = stat->bodyuniqueids(i); + } + break; + } case CMD_DESIRED_STATE_RECEIVED_COMPLETED: { converted = true; @@ -1681,7 +1697,10 @@ bool convertStatusToGRPC(const SharedMemoryStatus& serverStatus, char* bufferSer case CMD_ACTUAL_STATE_UPDATE_COMPLETED: { converted = true; - b3SharedMemoryStatusHandle statusHandle = (b3SharedMemoryStatusHandle)&serverStatus; + SharedMemoryStatus* status = (SharedMemoryStatus*)&serverStatus; + status->m_sendActualStateArgs.m_stateDetails = (SendActualStateSharedMemoryStorage*)bufferServerToClient; + b3SharedMemoryStatusHandle statusHandle = (b3SharedMemoryStatusHandle)status; + int bodyUniqueId; int numLinks; -- cgit v1.2.1