summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/framing
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h3
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp56
-rw-r--r--cpp/src/qpid/framing/BodyHandler.h56
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp219
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp5
-rw-r--r--cpp/src/qpid/framing/FrameSet.h1
-rw-r--r--cpp/src/qpid/framing/MethodContent.h2
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp2
-rw-r--r--cpp/src/qpid/framing/TransferContent.h2
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
10 files changed, 211 insertions, 136 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 4f6faf4199..19675ce6ff 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -43,8 +43,7 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock
ChannelId getChannel() const { return channel; }
void setChannel(ChannelId c) { channel = c; }
- AMQBody* getBody() { return body.get(); }
- const AMQBody* getBody() const { return body.get(); }
+ AMQBody* getBody() const { return body.get(); }
AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; }
const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; }
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
deleted file mode 100644
index db302b1e4c..0000000000
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/framing/BodyHandler.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeartbeatBody.h"
-#include <boost/cast.hpp>
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/Msg.h"
-
-using namespace qpid::framing;
-using namespace boost;
-
-BodyHandler::~BodyHandler() {}
-
-// TODO aconway 2007-08-13: Replace with visitor.
-void BodyHandler::handleBody(AMQBody* body) {
- switch(body->type())
- {
- case METHOD_BODY:
- handleMethod(polymorphic_downcast<AMQMethodBody*>(body));
- break;
- case HEADER_BODY:
- handleHeader(polymorphic_downcast<AMQHeaderBody*>(body));
- break;
- case CONTENT_BODY:
- handleContent(polymorphic_downcast<AMQContentBody*>(body));
- break;
- case HEARTBEAT_BODY:
- handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
- break;
- default:
- throw FramingErrorException(
- QPID_MSG("Invalid frame type " << body->type()));
- }
-}
-
diff --git a/cpp/src/qpid/framing/BodyHandler.h b/cpp/src/qpid/framing/BodyHandler.h
deleted file mode 100644
index 9ded737195..0000000000
--- a/cpp/src/qpid/framing/BodyHandler.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#ifndef _BodyHandler_
-#define _BodyHandler_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace framing {
-class AMQBody;
-class AMQMethodBody;
-class AMQHeaderBody;
-class AMQContentBody;
-class AMQHeartbeatBody;
-
-// TODO aconway 2007-08-10: rework using Visitor pattern?
-
-/**
- * Interface to handle incoming frame bodies.
- * Derived classes provide logic for each frame type.
- */
-class BodyHandler {
- public:
- virtual ~BodyHandler();
- virtual void handleBody(AMQBody* body);
-
- protected:
- virtual void handleMethod(AMQMethodBody*) = 0;
- virtual void handleHeader(AMQHeaderBody*) = 0;
- virtual void handleContent(AMQContentBody*) = 0;
- virtual void handleHeartbeat(AMQHeartbeatBody*) = 0;
-};
-
-}}
-
-
-#endif
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index f80d2f9fb1..0f7140b627 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -28,29 +28,93 @@
#include "qpid/Msg.h"
#include <assert.h>
+// The locking rationale in the FieldTable seems a little odd, but it
+// maintains the concurrent guarantees and requirements that were in
+// place before the cachedBytes/cachedSize were added:
+//
+// The FieldTable client code needs to make sure that they call no write
+// operation in parallel with any other operation on the FieldTable.
+// However multiple parallel read operations are safe.
+//
+// To this end the only code that is locked is code that can transparently
+// change the state of the FieldTable during a read only operation.
+// (In other words the code that required the mutable members in the class
+// definition!)
+//
namespace qpid {
+
+using sys::Mutex;
+using sys::ScopedLock;
+
namespace framing {
+FieldTable::FieldTable() :
+ cachedSize(0),
+ newBytes(false)
+{
+}
+
FieldTable::FieldTable(const FieldTable& ft)
{
- *this = ft;
+ ScopedLock<Mutex> l(ft.lock); // lock _source_ FieldTable
+
+ cachedBytes = ft.cachedBytes;
+ cachedSize = ft.cachedSize;
+ newBytes = ft.newBytes;
+
+ // Only copy the values if we have no raw data
+ // - copying the map is expensive and we can
+ // reconstruct it if necessary from the raw data
+ if (cachedBytes) {
+ newBytes = true;
+ return;
+ }
+ // In practice Encoding the source field table and only copying
+ // the encoded bytes is faster than copying the whole value map.
+ // (Because we nearly always copy a field table internally before
+ // encoding it to send, but don't change it after the copy)
+ if (!ft.values.empty()) {
+ // Side effect of getting encoded size will cache it in ft.cachedSize
+ ft.cachedBytes = boost::shared_array<uint8_t>(new uint8_t[ft.encodedSize()]);
+
+ Buffer buffer((char*)&ft.cachedBytes[0], ft.cachedSize);
+
+ // Cut and paste ahead...
+ buffer.putLong(ft.encodedSize() - 4);
+ buffer.putLong(ft.values.size());
+ for (ValueMap::const_iterator i = ft.values.begin(); i!=ft.values.end(); ++i) {
+ buffer.putShortString(i->first);
+ i->second->encode(buffer);
+ }
+
+ cachedBytes = ft.cachedBytes;
+ cachedSize = ft.cachedSize;
+ newBytes = true;
+ }
}
FieldTable& FieldTable::operator=(const FieldTable& ft)
{
- clear();
- values = ft.values;
- return *this;
+ FieldTable nft(ft);
+ values.swap(nft.values);
+ cachedBytes.swap(nft.cachedBytes);
+ cachedSize = nft.cachedSize;
+ newBytes = nft.newBytes;
+ return (*this);
}
-FieldTable::~FieldTable() {}
-
uint32_t FieldTable::encodedSize() const {
+ ScopedLock<Mutex> l(lock);
+
+ if (cachedSize != 0) {
+ return cachedSize;
+ }
uint32_t len(4/*size field*/ + 4/*count field*/);
for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
// shortstr_len_byte + key size + value size
- len += 1 + (i->first).size() + (i->second)->encodedSize();
+ len += 1 + (i->first).size() + (i->second)->encodedSize();
}
+ cachedSize = len;
return len;
}
@@ -66,6 +130,7 @@ std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_ty
}
std::ostream& operator<<(std::ostream& out, const FieldTable& t) {
+ t.realDecode();
out << "{";
FieldTable::ValueMap::const_iterator i = t.begin();
if (i != t.end()) out << *i++;
@@ -77,48 +142,70 @@ std::ostream& operator<<(std::ostream& out, const FieldTable& t) {
}
void FieldTable::set(const std::string& name, const ValuePtr& value){
+ realDecode();
values[name] = value;
+ flushRawCache();
}
void FieldTable::setString(const std::string& name, const std::string& value){
+ realDecode();
values[name] = ValuePtr(new Str16Value(value));
+ flushRawCache();
}
void FieldTable::setInt(const std::string& name, const int value){
+ realDecode();
values[name] = ValuePtr(new IntegerValue(value));
+ flushRawCache();
}
void FieldTable::setInt64(const std::string& name, const int64_t value){
+ realDecode();
values[name] = ValuePtr(new Integer64Value(value));
+ flushRawCache();
}
void FieldTable::setTimestamp(const std::string& name, const uint64_t value){
+ realDecode();
values[name] = ValuePtr(new TimeValue(value));
+ flushRawCache();
}
void FieldTable::setUInt64(const std::string& name, const uint64_t value){
+ realDecode();
values[name] = ValuePtr(new Unsigned64Value(value));
+ flushRawCache();
}
void FieldTable::setTable(const std::string& name, const FieldTable& value)
{
+ realDecode();
values[name] = ValuePtr(new FieldTableValue(value));
+ flushRawCache();
}
void FieldTable::setArray(const std::string& name, const Array& value)
{
+ realDecode();
values[name] = ValuePtr(new ArrayValue(value));
+ flushRawCache();
}
void FieldTable::setFloat(const std::string& name, const float value){
+ realDecode();
values[name] = ValuePtr(new FloatValue(value));
+ flushRawCache();
}
void FieldTable::setDouble(const std::string& name, double value){
+ realDecode();
values[name] = ValuePtr(new DoubleValue(value));
+ flushRawCache();
}
FieldTable::ValuePtr FieldTable::get(const std::string& name) const
{
+ // Ensure we have any values we're trying to read
+ realDecode();
ValuePtr value;
ValueMap::const_iterator i = values.find(name);
if ( i!=values.end() )
@@ -188,37 +275,82 @@ bool FieldTable::getDouble(const std::string& name, double& value) const {
//}
void FieldTable::encode(Buffer& buffer) const {
- buffer.putLong(encodedSize() - 4);
- buffer.putLong(values.size());
- for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
- buffer.putShortString(i->first);
- i->second->encode(buffer);
+ // If we've still got the input field table
+ // we can just copy it directly to the output
+ if (cachedBytes) {
+ ScopedLock<Mutex> l(lock);
+ buffer.putRawData(&cachedBytes[0], cachedSize);
+ } else {
+ buffer.putLong(encodedSize() - 4);
+ buffer.putLong(values.size());
+ for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
+ buffer.putShortString(i->first);
+ i->second->encode(buffer);
+ }
}
}
+// Decode lazily - just record the raw bytes until we need them
void FieldTable::decode(Buffer& buffer){
- clear();
if (buffer.available() < 4)
throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
+ uint32_t p = buffer.getPosition();
uint32_t len = buffer.getLong();
if (len) {
uint32_t available = buffer.available();
if ((available < len) || (available < 4))
throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
+ }
+ ScopedLock<Mutex> l(lock);
+ // Throw away previous stored values
+ values.clear();
+ // Copy data into our buffer
+ cachedBytes = boost::shared_array<uint8_t>(new uint8_t[len + 4]);
+ cachedSize = len + 4;
+ newBytes = true;
+ buffer.setPosition(p);
+ buffer.getRawData(&cachedBytes[0], cachedSize);
+}
+
+void FieldTable::realDecode() const
+{
+ ScopedLock<Mutex> l(lock);
+
+ // If we've got no raw data stored up then nothing to do
+ if (!newBytes)
+ return;
+
+ Buffer buffer((char*)&cachedBytes[0], cachedSize);
+ uint32_t len = buffer.getLong();
+ if (len) {
+ uint32_t available = buffer.available();
uint32_t count = buffer.getLong();
uint32_t leftover = available - len;
while(buffer.available() > leftover && count--){
std::string name;
ValuePtr value(new FieldValue);
-
+
buffer.getShortString(name);
value->decode(buffer);
values[name] = ValuePtr(value);
- }
+ }
}
+ newBytes = false;
+}
+
+void FieldTable::flushRawCache()
+{
+ ScopedLock<Mutex> l(lock);
+ // We can only flush the cache if there are no cached bytes to decode
+ assert(newBytes==false);
+ // Avoid recreating shared array unless we actually have one.
+ if (cachedBytes) cachedBytes.reset();
+ cachedSize = 0;
}
bool FieldTable::operator==(const FieldTable& x) const {
+ realDecode();
+ x.realDecode();
if (values.size() != x.values.size()) return false;
for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
ValueMap::const_iterator j = x.values.find(i->first);
@@ -230,20 +362,73 @@ bool FieldTable::operator==(const FieldTable& x) const {
void FieldTable::erase(const std::string& name)
{
- if (values.find(name) != values.end())
+ realDecode();
+ if (values.find(name) != values.end()) {
values.erase(name);
+ flushRawCache();
+ }
+}
+
+void FieldTable::clear()
+{
+ values.clear();
+ newBytes = false;
+ flushRawCache();
+}
+
+// Map-like interface.
+FieldTable::ValueMap::const_iterator FieldTable::begin() const
+{
+ realDecode();
+ return values.begin();
+}
+
+FieldTable::ValueMap::const_iterator FieldTable::end() const
+{
+ realDecode();
+ return values.end();
+}
+
+FieldTable::ValueMap::const_iterator FieldTable::find(const std::string& s) const
+{
+ realDecode();
+ return values.find(s);
+}
+
+FieldTable::ValueMap::iterator FieldTable::begin()
+{
+ realDecode();
+ flushRawCache();
+ return values.begin();
+}
+
+FieldTable::ValueMap::iterator FieldTable::end()
+{
+ realDecode();
+ flushRawCache();
+ return values.end();
+}
+
+FieldTable::ValueMap::iterator FieldTable::find(const std::string& s)
+{
+ realDecode();
+ flushRawCache();
+ return values.find(s);
}
std::pair<FieldTable::ValueMap::iterator, bool> FieldTable::insert(const ValueMap::value_type& value)
{
+ realDecode();
+ flushRawCache();
return values.insert(value);
}
FieldTable::ValueMap::iterator FieldTable::insert(ValueMap::iterator position, const ValueMap::value_type& value)
{
+ realDecode();
+ flushRawCache();
return values.insert(position, value);
}
-
}
}
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index 255aaf6e6b..9aee7b98b9 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -26,7 +26,6 @@
#include "qpid/framing/TypeFilter.h"
using namespace qpid::framing;
-using namespace boost;
FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true) { }
FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true)
@@ -103,3 +102,7 @@ std::string FrameSet::getContent() const {
getContent(out);
return out;
}
+
+bool FrameSet::hasContent() const {
+ return parts.size() >= 3;
+}
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index cae75e5ec8..3b9f60950b 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -54,6 +54,7 @@ public:
QPID_COMMON_EXTERN void getContent(std::string&) const;
QPID_COMMON_EXTERN std::string getContent() const;
+ QPID_COMMON_EXTERN bool hasContent() const;
bool isContentBearing() const;
diff --git a/cpp/src/qpid/framing/MethodContent.h b/cpp/src/qpid/framing/MethodContent.h
index b290a0c140..58c9143cfa 100644
--- a/cpp/src/qpid/framing/MethodContent.h
+++ b/cpp/src/qpid/framing/MethodContent.h
@@ -32,7 +32,7 @@ class MethodContent
public:
virtual ~MethodContent() {}
//TODO: rethink this interface
- virtual AMQHeaderBody getHeader() const = 0;
+ virtual const AMQHeaderBody& getHeader() const = 0;
virtual const std::string& getData() const = 0;
};
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index 837d7d346a..d997b24304 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -30,7 +30,7 @@ TransferContent::TransferContent(const std::string& data, const std::string& key
}
-AMQHeaderBody TransferContent::getHeader() const
+const AMQHeaderBody& TransferContent::getHeader() const
{
return header;
}
diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h
index 9a698a1823..32663d7020 100644
--- a/cpp/src/qpid/framing/TransferContent.h
+++ b/cpp/src/qpid/framing/TransferContent.h
@@ -40,7 +40,7 @@ public:
QPID_COMMON_EXTERN TransferContent(const std::string& data = std::string(), const std::string& key=std::string());
///@internal
- QPID_COMMON_EXTERN AMQHeaderBody getHeader() const;
+ QPID_COMMON_EXTERN const AMQHeaderBody& getHeader() const;
QPID_COMMON_EXTERN void setData(const std::string&);
QPID_COMMON_EXTERN const std::string& getData() const;
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index 3a8b39afb5..2e58922364 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -21,7 +21,6 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQBody.h"
-#include "qpid/framing/BodyHandler.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQContentBody.h"