diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
| commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
| tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/framing | |
| parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
| download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/BodyHandler.cpp | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/BodyHandler.h | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FieldTable.cpp | 219 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/MethodContent.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/TransferContent.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/TransferContent.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/amqp_framing.h | 1 |
10 files changed, 211 insertions, 136 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 4f6faf4199..19675ce6ff 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -43,8 +43,7 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock ChannelId getChannel() const { return channel; } void setChannel(ChannelId c) { channel = c; } - AMQBody* getBody() { return body.get(); } - const AMQBody* getBody() const { return body.get(); } + AMQBody* getBody() const { return body.get(); } AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; } const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; } diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp deleted file mode 100644 index db302b1e4c..0000000000 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/framing/BodyHandler.h" -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeartbeatBody.h" -#include <boost/cast.hpp> -#include "qpid/framing/reply_exceptions.h" -#include "qpid/Msg.h" - -using namespace qpid::framing; -using namespace boost; - -BodyHandler::~BodyHandler() {} - -// TODO aconway 2007-08-13: Replace with visitor. -void BodyHandler::handleBody(AMQBody* body) { - switch(body->type()) - { - case METHOD_BODY: - handleMethod(polymorphic_downcast<AMQMethodBody*>(body)); - break; - case HEADER_BODY: - handleHeader(polymorphic_downcast<AMQHeaderBody*>(body)); - break; - case CONTENT_BODY: - handleContent(polymorphic_downcast<AMQContentBody*>(body)); - break; - case HEARTBEAT_BODY: - handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); - break; - default: - throw FramingErrorException( - QPID_MSG("Invalid frame type " << body->type())); - } -} - diff --git a/cpp/src/qpid/framing/BodyHandler.h b/cpp/src/qpid/framing/BodyHandler.h deleted file mode 100644 index 9ded737195..0000000000 --- a/cpp/src/qpid/framing/BodyHandler.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef _BodyHandler_ -#define _BodyHandler_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace framing { -class AMQBody; -class AMQMethodBody; -class AMQHeaderBody; -class AMQContentBody; -class AMQHeartbeatBody; - -// TODO aconway 2007-08-10: rework using Visitor pattern? - -/** - * Interface to handle incoming frame bodies. - * Derived classes provide logic for each frame type. - */ -class BodyHandler { - public: - virtual ~BodyHandler(); - virtual void handleBody(AMQBody* body); - - protected: - virtual void handleMethod(AMQMethodBody*) = 0; - virtual void handleHeader(AMQHeaderBody*) = 0; - virtual void handleContent(AMQContentBody*) = 0; - virtual void handleHeartbeat(AMQHeartbeatBody*) = 0; -}; - -}} - - -#endif diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index f80d2f9fb1..0f7140b627 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -28,29 +28,93 @@ #include "qpid/Msg.h" #include <assert.h> +// The locking rationale in the FieldTable seems a little odd, but it +// maintains the concurrent guarantees and requirements that were in +// place before the cachedBytes/cachedSize were added: +// +// The FieldTable client code needs to make sure that they call no write +// operation in parallel with any other operation on the FieldTable. +// However multiple parallel read operations are safe. +// +// To this end the only code that is locked is code that can transparently +// change the state of the FieldTable during a read only operation. +// (In other words the code that required the mutable members in the class +// definition!) +// namespace qpid { + +using sys::Mutex; +using sys::ScopedLock; + namespace framing { +FieldTable::FieldTable() : + cachedSize(0), + newBytes(false) +{ +} + FieldTable::FieldTable(const FieldTable& ft) { - *this = ft; + ScopedLock<Mutex> l(ft.lock); // lock _source_ FieldTable + + cachedBytes = ft.cachedBytes; + cachedSize = ft.cachedSize; + newBytes = ft.newBytes; + + // Only copy the values if we have no raw data + // - copying the map is expensive and we can + // reconstruct it if necessary from the raw data + if (cachedBytes) { + newBytes = true; + return; + } + // In practice Encoding the source field table and only copying + // the encoded bytes is faster than copying the whole value map. + // (Because we nearly always copy a field table internally before + // encoding it to send, but don't change it after the copy) + if (!ft.values.empty()) { + // Side effect of getting encoded size will cache it in ft.cachedSize + ft.cachedBytes = boost::shared_array<uint8_t>(new uint8_t[ft.encodedSize()]); + + Buffer buffer((char*)&ft.cachedBytes[0], ft.cachedSize); + + // Cut and paste ahead... + buffer.putLong(ft.encodedSize() - 4); + buffer.putLong(ft.values.size()); + for (ValueMap::const_iterator i = ft.values.begin(); i!=ft.values.end(); ++i) { + buffer.putShortString(i->first); + i->second->encode(buffer); + } + + cachedBytes = ft.cachedBytes; + cachedSize = ft.cachedSize; + newBytes = true; + } } FieldTable& FieldTable::operator=(const FieldTable& ft) { - clear(); - values = ft.values; - return *this; + FieldTable nft(ft); + values.swap(nft.values); + cachedBytes.swap(nft.cachedBytes); + cachedSize = nft.cachedSize; + newBytes = nft.newBytes; + return (*this); } -FieldTable::~FieldTable() {} - uint32_t FieldTable::encodedSize() const { + ScopedLock<Mutex> l(lock); + + if (cachedSize != 0) { + return cachedSize; + } uint32_t len(4/*size field*/ + 4/*count field*/); for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { // shortstr_len_byte + key size + value size - len += 1 + (i->first).size() + (i->second)->encodedSize(); + len += 1 + (i->first).size() + (i->second)->encodedSize(); } + cachedSize = len; return len; } @@ -66,6 +130,7 @@ std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_ty } std::ostream& operator<<(std::ostream& out, const FieldTable& t) { + t.realDecode(); out << "{"; FieldTable::ValueMap::const_iterator i = t.begin(); if (i != t.end()) out << *i++; @@ -77,48 +142,70 @@ std::ostream& operator<<(std::ostream& out, const FieldTable& t) { } void FieldTable::set(const std::string& name, const ValuePtr& value){ + realDecode(); values[name] = value; + flushRawCache(); } void FieldTable::setString(const std::string& name, const std::string& value){ + realDecode(); values[name] = ValuePtr(new Str16Value(value)); + flushRawCache(); } void FieldTable::setInt(const std::string& name, const int value){ + realDecode(); values[name] = ValuePtr(new IntegerValue(value)); + flushRawCache(); } void FieldTable::setInt64(const std::string& name, const int64_t value){ + realDecode(); values[name] = ValuePtr(new Integer64Value(value)); + flushRawCache(); } void FieldTable::setTimestamp(const std::string& name, const uint64_t value){ + realDecode(); values[name] = ValuePtr(new TimeValue(value)); + flushRawCache(); } void FieldTable::setUInt64(const std::string& name, const uint64_t value){ + realDecode(); values[name] = ValuePtr(new Unsigned64Value(value)); + flushRawCache(); } void FieldTable::setTable(const std::string& name, const FieldTable& value) { + realDecode(); values[name] = ValuePtr(new FieldTableValue(value)); + flushRawCache(); } void FieldTable::setArray(const std::string& name, const Array& value) { + realDecode(); values[name] = ValuePtr(new ArrayValue(value)); + flushRawCache(); } void FieldTable::setFloat(const std::string& name, const float value){ + realDecode(); values[name] = ValuePtr(new FloatValue(value)); + flushRawCache(); } void FieldTable::setDouble(const std::string& name, double value){ + realDecode(); values[name] = ValuePtr(new DoubleValue(value)); + flushRawCache(); } FieldTable::ValuePtr FieldTable::get(const std::string& name) const { + // Ensure we have any values we're trying to read + realDecode(); ValuePtr value; ValueMap::const_iterator i = values.find(name); if ( i!=values.end() ) @@ -188,37 +275,82 @@ bool FieldTable::getDouble(const std::string& name, double& value) const { //} void FieldTable::encode(Buffer& buffer) const { - buffer.putLong(encodedSize() - 4); - buffer.putLong(values.size()); - for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { - buffer.putShortString(i->first); - i->second->encode(buffer); + // If we've still got the input field table + // we can just copy it directly to the output + if (cachedBytes) { + ScopedLock<Mutex> l(lock); + buffer.putRawData(&cachedBytes[0], cachedSize); + } else { + buffer.putLong(encodedSize() - 4); + buffer.putLong(values.size()); + for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) { + buffer.putShortString(i->first); + i->second->encode(buffer); + } } } +// Decode lazily - just record the raw bytes until we need them void FieldTable::decode(Buffer& buffer){ - clear(); if (buffer.available() < 4) throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); + uint32_t p = buffer.getPosition(); uint32_t len = buffer.getLong(); if (len) { uint32_t available = buffer.available(); if ((available < len) || (available < 4)) throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); + } + ScopedLock<Mutex> l(lock); + // Throw away previous stored values + values.clear(); + // Copy data into our buffer + cachedBytes = boost::shared_array<uint8_t>(new uint8_t[len + 4]); + cachedSize = len + 4; + newBytes = true; + buffer.setPosition(p); + buffer.getRawData(&cachedBytes[0], cachedSize); +} + +void FieldTable::realDecode() const +{ + ScopedLock<Mutex> l(lock); + + // If we've got no raw data stored up then nothing to do + if (!newBytes) + return; + + Buffer buffer((char*)&cachedBytes[0], cachedSize); + uint32_t len = buffer.getLong(); + if (len) { + uint32_t available = buffer.available(); uint32_t count = buffer.getLong(); uint32_t leftover = available - len; while(buffer.available() > leftover && count--){ std::string name; ValuePtr value(new FieldValue); - + buffer.getShortString(name); value->decode(buffer); values[name] = ValuePtr(value); - } + } } + newBytes = false; +} + +void FieldTable::flushRawCache() +{ + ScopedLock<Mutex> l(lock); + // We can only flush the cache if there are no cached bytes to decode + assert(newBytes==false); + // Avoid recreating shared array unless we actually have one. + if (cachedBytes) cachedBytes.reset(); + cachedSize = 0; } bool FieldTable::operator==(const FieldTable& x) const { + realDecode(); + x.realDecode(); if (values.size() != x.values.size()) return false; for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) { ValueMap::const_iterator j = x.values.find(i->first); @@ -230,20 +362,73 @@ bool FieldTable::operator==(const FieldTable& x) const { void FieldTable::erase(const std::string& name) { - if (values.find(name) != values.end()) + realDecode(); + if (values.find(name) != values.end()) { values.erase(name); + flushRawCache(); + } +} + +void FieldTable::clear() +{ + values.clear(); + newBytes = false; + flushRawCache(); +} + +// Map-like interface. +FieldTable::ValueMap::const_iterator FieldTable::begin() const +{ + realDecode(); + return values.begin(); +} + +FieldTable::ValueMap::const_iterator FieldTable::end() const +{ + realDecode(); + return values.end(); +} + +FieldTable::ValueMap::const_iterator FieldTable::find(const std::string& s) const +{ + realDecode(); + return values.find(s); +} + +FieldTable::ValueMap::iterator FieldTable::begin() +{ + realDecode(); + flushRawCache(); + return values.begin(); +} + +FieldTable::ValueMap::iterator FieldTable::end() +{ + realDecode(); + flushRawCache(); + return values.end(); +} + +FieldTable::ValueMap::iterator FieldTable::find(const std::string& s) +{ + realDecode(); + flushRawCache(); + return values.find(s); } std::pair<FieldTable::ValueMap::iterator, bool> FieldTable::insert(const ValueMap::value_type& value) { + realDecode(); + flushRawCache(); return values.insert(value); } FieldTable::ValueMap::iterator FieldTable::insert(ValueMap::iterator position, const ValueMap::value_type& value) { + realDecode(); + flushRawCache(); return values.insert(position, value); } - } } diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 255aaf6e6b..9aee7b98b9 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -26,7 +26,6 @@ #include "qpid/framing/TypeFilter.h" using namespace qpid::framing; -using namespace boost; FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true) { } FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true) @@ -103,3 +102,7 @@ std::string FrameSet::getContent() const { getContent(out); return out; } + +bool FrameSet::hasContent() const { + return parts.size() >= 3; +} diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index cae75e5ec8..3b9f60950b 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -54,6 +54,7 @@ public: QPID_COMMON_EXTERN void getContent(std::string&) const; QPID_COMMON_EXTERN std::string getContent() const; + QPID_COMMON_EXTERN bool hasContent() const; bool isContentBearing() const; diff --git a/cpp/src/qpid/framing/MethodContent.h b/cpp/src/qpid/framing/MethodContent.h index b290a0c140..58c9143cfa 100644 --- a/cpp/src/qpid/framing/MethodContent.h +++ b/cpp/src/qpid/framing/MethodContent.h @@ -32,7 +32,7 @@ class MethodContent public: virtual ~MethodContent() {} //TODO: rethink this interface - virtual AMQHeaderBody getHeader() const = 0; + virtual const AMQHeaderBody& getHeader() const = 0; virtual const std::string& getData() const = 0; }; diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 837d7d346a..d997b24304 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -30,7 +30,7 @@ TransferContent::TransferContent(const std::string& data, const std::string& key } -AMQHeaderBody TransferContent::getHeader() const +const AMQHeaderBody& TransferContent::getHeader() const { return header; } diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index 9a698a1823..32663d7020 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -40,7 +40,7 @@ public: QPID_COMMON_EXTERN TransferContent(const std::string& data = std::string(), const std::string& key=std::string()); ///@internal - QPID_COMMON_EXTERN AMQHeaderBody getHeader() const; + QPID_COMMON_EXTERN const AMQHeaderBody& getHeader() const; QPID_COMMON_EXTERN void setData(const std::string&); QPID_COMMON_EXTERN const std::string& getData() const; diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index 3a8b39afb5..2e58922364 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -21,7 +21,6 @@ #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQBody.h" -#include "qpid/framing/BodyHandler.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQContentBody.h" |
