/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include "framing/AMQHeaderBody.h" #include "framing/AMQContentBody.h" #include "BasicGetOkBody.h" #include "BasicReturnBody.h" #include "BasicDeliverBody.h" #include #include namespace qpid { namespace client { using namespace sys; using namespace framing; struct IncomingMessage::Guard: public Mutex::ScopedLock { Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { im->shutdownError.throwIf(); } }; IncomingMessage::IncomingMessage() { reset(); } void IncomingMessage::reset() { state = &IncomingMessage::expectRequest; endFn= &IncomingMessage::endRequest; buildMessage = Message(); } void IncomingMessage::startGet() { Guard g(this); if (state != &IncomingMessage::expectRequest) { endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); } else { state = &IncomingMessage::expectGetOk; endFn = &IncomingMessage::endGet; getError.reset(); getState = GETTING; } } bool IncomingMessage::waitGet(Message& msg) { Guard g(this); while (getState == GETTING && !shutdownError && !getError) getReady.wait(lock); shutdownError.throwIf(); getError.throwIf(); msg = getMessage; return getState==GOT; } Message IncomingMessage::waitDispatch() { Guard g(this); while(dispatchQueue.empty() && !shutdownError) dispatchReady.wait(lock); shutdownError.throwIf(); Message msg(dispatchQueue.front()); dispatchQueue.pop(); return msg; } void IncomingMessage::add(BodyPtr body) { Guard g(this); shutdownError.throwIf(); // Call the current state function. (this->*state)(body); } void IncomingMessage::shutdown() { Mutex::ScopedLock l(lock); shutdownError.reset(new ShutdownException()); getReady.notify(); dispatchReady.notify(); } bool IncomingMessage::isShutdown() const { Mutex::ScopedLock l(lock); return shutdownError; } // Common check for all the expect functions. Called in network thread. template boost::shared_ptr IncomingMessage::expectCheck(BodyPtr body) { boost::shared_ptr ptr = boost::dynamic_pointer_cast(body); if (!ptr) throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); return ptr; } void IncomingMessage::expectGetOk(BodyPtr body) { if (dynamic_cast(body.get())) state = &IncomingMessage::expectHeader; else if (dynamic_cast(body.get())) { getState = EMPTY; endGet(); } else throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); } void IncomingMessage::expectHeader(BodyPtr body) { AMQHeaderBody::shared_ptr header = expectCheck(body); buildMessage.header = header; state = &IncomingMessage::expectContent; checkComplete(); } void IncomingMessage::expectContent(BodyPtr body) { AMQContentBody::shared_ptr content = expectCheck(body); buildMessage.setData(buildMessage.getData() + content->getData()); checkComplete(); } void IncomingMessage::checkComplete() { size_t declaredSize = buildMessage.header->getContentSize(); size_t currentSize = buildMessage.getData().size(); if (declaredSize == currentSize) (this->*endFn)(0); else if (declaredSize < currentSize) (this->*endFn)(new QPID_ERROR( PROTOCOL_ERROR, "Message content exceeds declared size.")); } void IncomingMessage::expectRequest(BodyPtr body) { AMQMethodBody::shared_ptr method = expectCheck(body); buildMessage.setMethod(method); state = &IncomingMessage::expectHeader; } void IncomingMessage::endGet(Exception* ex) { getError.reset(ex); if (getState == GETTING) { getMessage = buildMessage; getState = GOT; } reset(); getReady.notify(); } void IncomingMessage::endRequest(Exception* ex) { ExceptionHolder eh(ex); if (!eh) { dispatchQueue.push(buildMessage); reset(); dispatchReady.notify(); } eh.throwIf(); } }} // namespace qpid::client