summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-13 15:06:54 +0000
committerGordon Sim <gsim@apache.org>2013-08-13 15:06:54 +0000
commitd30253ae61bb81090ba43b055094dbe5a6d7c98d (patch)
treeb63d2b8f0277a8297937e6cac6a35b9e49fc9738
parent144f3c698bdddf22509691a4f285305e9fd83291 (diff)
downloadqpid-python-d30253ae61bb81090ba43b055094dbe5a6d7c98d.tar.gz
QPID-4711: translate between structured content in AMQP 0-10 and 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1513537 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/bindings/qpid/python/python.i7
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp109
-rw-r--r--qpid/python/qpid/tests/messaging/__init__.py13
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py3
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/general.py10
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py10
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/selector.py10
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/translation.py87
8 files changed, 203 insertions, 46 deletions
diff --git a/qpid/cpp/bindings/qpid/python/python.i b/qpid/cpp/bindings/qpid/python/python.i
index c10ea46000..9158836a2b 100644
--- a/qpid/cpp/bindings/qpid/python/python.i
+++ b/qpid/cpp/bindings/qpid/python/python.i
@@ -351,6 +351,9 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
self.setProperty(k, v)
def _get_content(self) :
+ obj = self.getContentObject()
+ if obj:
+ return obj
if self.content_type == "amqp/list" :
return decodeList(self)
if self.content_type == "amqp/map" :
@@ -365,9 +368,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
elif isinstance(content, list) or isinstance(content, dict) :
encode(content, self)
else :
- # Not a type we can handle. Try setting it anyway,
- # although this will probably lead to a swig error
- self.setContent(str(content))
+ self.setContentObject(content)
__swig_getmethods__["content"] = _get_content
__swig_setmethods__["content"] = _set_content
if _newclass: content = property(_get_content, _set_content)
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index e04d44d2c8..188738287e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -27,6 +27,7 @@
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/types/Variant.h"
+#include "qpid/types/encodings.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <boost/lexical_cast.hpp>
@@ -38,6 +39,8 @@ namespace {
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
+const std::string TEXT_PLAIN("text/plain");
+const std::string SUBJECT_KEY("qpid.subject");
qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
{
@@ -98,8 +101,25 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
bool hasTo() const { return getDestination().size() || hasSubject(); }
std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); }
- bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
- std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
+ bool hasSubject() const
+ {
+ if (getDestination().empty()) {
+ return getApplicationProperties().isSet(SUBJECT_KEY);
+ } else {
+ return deliveryProperties && deliveryProperties->hasRoutingKey();
+ }
+ }
+ std::string getSubject() const
+ {
+ if (getDestination().empty()) {
+ //message was sent to default exchange, routing key is the queue name
+ return getApplicationProperties().getAsString(SUBJECT_KEY);
+ } else if (deliveryProperties) {
+ return deliveryProperties->getRoutingKey();
+ } else {
+ return EMPTY;
+ }
+ }
bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
@@ -119,7 +139,7 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
bool hasReplyToGroupId() const { return false; }
std::string getReplyToGroupId() const { return EMPTY; }
- const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
+ const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); }
Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
@@ -138,7 +158,6 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {}
-
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
{
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
@@ -161,13 +180,38 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
transfer->getFrames().append(method);
transfer->getFrames().append(header);
- qpid::amqp::CharSequence body = message->getBody();
- content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
- transfer->getFrames().append(content);
-
qpid::framing::MessageProperties* props =
transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
- props->setContentLength(body.size);
+
+ if (message->isTypedBody()) {
+ qpid::types::Variant body = message->getTypedBody();
+ std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
+ if (body.getType() == qpid::types::VAR_MAP) {
+ qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
+ props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
+ } else if (body.getType() == qpid::types::VAR_LIST) {
+ qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
+ props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ } else if (body.getType() == qpid::types::VAR_STRING) {
+ data = body.getString();
+ if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) {
+ props->setContentType(TEXT_PLAIN);
+ }
+ } else {
+ qpid::types::Variant::List container;
+ container.push_back(body);
+ qpid::amqp_0_10::ListCodec::encode(container, data);
+ props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+ }
+ transfer->getFrames().append(content);
+ props->setContentLength(data.size());
+ } else {
+ qpid::amqp::CharSequence body = message->getBody();
+ content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
+ transfer->getFrames().append(content);
+
+ props->setContentLength(body.size);
+ }
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
@@ -215,7 +259,10 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
dp->setPriority(message->getPriority());
if (message->isPersistent()) dp->setDeliveryMode(2);
- if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+ if (message->getRoutingKey().size()) {
+ dp->setRoutingKey(message->getRoutingKey());
+ props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
+ }
return transfer.get();
} else {
@@ -246,14 +293,40 @@ void Translation::write(OutgoingFromQueue& out)
Properties_0_10 properties(*transfer);
qpid::types::Variant::Map applicationProperties;
qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
- std::string content = transfer->getContent();
- size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
- std::vector<char> buffer(size);
- qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
- encoder.writeProperties(properties);
- encoder.writeApplicationProperties(applicationProperties);
- if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
- out.write(&buffer[0], encoder.getPosition());
+ if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+ qpid::types::Variant::Map content;
+ qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content);
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+ size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE);
+ out.write(&buffer[0], encoder.getPosition());
+ } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) {
+ qpid::types::Variant::List content;
+ qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content);
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+ size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/
+ size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE);
+ out.write(&buffer[0], encoder.getPosition());
+ } else {
+ std::string content = transfer->getContent();
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
+ out.write(&buffer[0], encoder.getPosition());
+ }
} else {
QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
}
diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py
index 5c9cdf2f27..38a5b066d6 100644
--- a/qpid/python/qpid/tests/messaging/__init__.py
+++ b/qpid/python/qpid/tests/messaging/__init__.py
@@ -188,4 +188,17 @@ class Base(Test):
return {"reconnect": self.reconnect(),
"transport": self.transport()}
+class VersionTest (Base):
+ def create_connection(self, version="amqp1.0", force=False):
+ opts = self.connection_options()
+ if force or not 'protocol' in opts:
+ opts['protocol'] = version;
+ return Connection.establish(self.broker, **opts)
+
+ def setup_connection(self):
+ return self.create_connection()
+
+ def setup_session(self):
+ return self.conn.session()
+
import address, endpoints, message
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
index 8088ddd95f..5ebbb4c651 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
@@ -19,6 +19,7 @@
# under the License.
#
+from general import *
from legacy_exchanges import *
from selector import *
-from general import *
+from translation import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
index 085acf8405..b3ff1df5a1 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
@@ -18,18 +18,12 @@
#
from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import Base
+from qpid.tests.messaging import VersionTest
-class GeneralTests (Base):
+class GeneralTests (VersionTest):
"""
Miscellaneous tests for core AMQP 1.0 messaging behaviour.
"""
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
def test_request_response(self):
snd_request = self.ssn.sender("#")
rcv_response = self.ssn.receiver("#")
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py b/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py
index d2b8c643bd..024a1da689 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py
@@ -18,19 +18,13 @@
#
from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import Base
+from qpid.tests.messaging import VersionTest
-class LegacyExchangeTests (Base):
+class LegacyExchangeTests (VersionTest):
"""
Tests for the legacy (i.e. pre 1.0) AMQP exchanges and the filters
defined for them and registered for AMQP 1.0.
"""
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
def test_fanout(self):
msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
index 696fbf01c3..ac2bbd8db3 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
@@ -18,19 +18,13 @@
#
from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import Base
+from qpid.tests.messaging import VersionTest
-class SelectorTests (Base):
+class SelectorTests (VersionTest):
"""
Tests for the selector filter registered for AMQP 1.0 under the
apache namespace.
"""
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
def basic_selection_test(self, node):
properties = [(1, 'red','dog'), (2, 'black', 'cat'), (3, 'red', 'squirrel'), (4, 'grey', 'squirrel')]
msgs = [Message(content="%s.%s" % (colour, creature), properties={'sequence':sequence,'colour':colour}) for sequence, colour, creature in properties]
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py b/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py
new file mode 100644
index 0000000000..a6394fb8c5
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests.messaging.implementation import *
+from qpid.tests.messaging import VersionTest
+
+class TranslationTests (VersionTest):
+ """
+ Testing translation of messages between 1.0 and 0-10
+ """
+ def send_receive_messages(self, msgs, send_version, receive_version, address):
+ rcon = self.create_connection(receive_version, True)
+ rcv = rcon.session().receiver(address)
+
+ scon = self.create_connection(send_version, True)
+ snd = scon.session().sender(rcv.source)
+
+ for m in msgs: snd.send(m)
+
+ for expected in msgs:
+ msg = rcv.fetch()
+ assert msg.content == expected.content, (msg.content, expected.content)
+ assert msg.subject == expected.subject, (msg.subject, expected.subject)
+ self.ssn.acknowledge(msg)
+ scon.close()
+ rcon.close()
+
+ def send_receive(self, send_version, receive_version, address):
+ self.send_receive_messages([Message(content=s, subject = s) for s in ['a','b','c','d']], send_version, receive_version, address)
+
+ def send_receive_map(self, send_version, receive_version, address):
+ self.send_receive_messages([Message(content={'s':'abc','i':10})], send_version, receive_version, address)
+
+ def send_receive_list(self, send_version, receive_version, address):
+ self.send_receive_messages([Message(content=['a', 1, 'c'])], send_version, receive_version, address)
+
+ def test_translation_queue_1(self):
+ self.send_receive("amqp0-10", "amqp1.0", '#')
+
+ def test_translation_queue_2(self):
+ self.send_receive("amqp1.0", "amqp0-10", '#')
+
+ def test_translation_exchange_1(self):
+ self.send_receive("amqp0-10", "amqp1.0", 'amq.fanout')
+
+ def test_translation_exchange_2(self):
+ self.send_receive("amqp1.0", "amqp0-10", 'amq.fanout')
+
+ def test_send_receive_queue_1(self):
+ self.send_receive("amqp1.0", "amqp1.0", '#')
+
+ def test_send_receive_queue_2(self):
+ self.send_receive("amqp0-10", "amqp0-10", '#')
+
+ def test_send_receive_exchange_1(self):
+ self.send_receive("amqp1.0", "amqp1.0", 'amq.fanout')
+
+ def test_send_receive_exchange_2(self):
+ self.send_receive("amqp0-10", "amqp0-10", 'amq.fanout')
+
+ def test_translate_map_1(self):
+ self.send_receive_map("amqp0-10", "amqp1.0", '#')
+
+ def test_translate_map_2(self):
+ self.send_receive_map("amqp1.0", "amqp0-10", '#')
+
+ def test_translate_list_1(self):
+ self.send_receive_list("amqp0-10", "amqp1.0", '#')
+
+ def test_translate_list_2(self):
+ self.send_receive_list("amqp1.0", "amqp0-10", '#')