diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-13 15:06:54 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-13 15:06:54 +0000 |
| commit | d30253ae61bb81090ba43b055094dbe5a6d7c98d (patch) | |
| tree | b63d2b8f0277a8297937e6cac6a35b9e49fc9738 | |
| parent | 144f3c698bdddf22509691a4f285305e9fd83291 (diff) | |
| download | qpid-python-d30253ae61bb81090ba43b055094dbe5a6d7c98d.tar.gz | |
QPID-4711: translate between structured content in AMQP 0-10 and 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1513537 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/bindings/qpid/python/python.i | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 109 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/__init__.py | 13 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py | 3 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/general.py | 10 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py | 10 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/selector.py | 10 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/translation.py | 87 |
8 files changed, 203 insertions, 46 deletions
diff --git a/qpid/cpp/bindings/qpid/python/python.i b/qpid/cpp/bindings/qpid/python/python.i index c10ea46000..9158836a2b 100644 --- a/qpid/cpp/bindings/qpid/python/python.i +++ b/qpid/cpp/bindings/qpid/python/python.i @@ -351,6 +351,9 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) self.setProperty(k, v) def _get_content(self) : + obj = self.getContentObject() + if obj: + return obj if self.content_type == "amqp/list" : return decodeList(self) if self.content_type == "amqp/map" : @@ -365,9 +368,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) elif isinstance(content, list) or isinstance(content, dict) : encode(content, self) else : - # Not a type we can handle. Try setting it anyway, - # although this will probably lead to a swig error - self.setContent(str(content)) + self.setContentObject(content) __swig_getmethods__["content"] = _get_content __swig_setmethods__["content"] = _set_content if _newclass: content = property(_get_content, _set_content) diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index e04d44d2c8..188738287e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -27,6 +27,7 @@ #include "qpid/amqp/MessageEncoder.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/types/Variant.h" +#include "qpid/types/encodings.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include <boost/lexical_cast.hpp> @@ -38,6 +39,8 @@ namespace { const std::string EMPTY; const std::string FORWARD_SLASH("/"); +const std::string TEXT_PLAIN("text/plain"); +const std::string SUBJECT_KEY("qpid.subject"); qpid::framing::ReplyTo translate(const std::string address, Broker* broker) { @@ -98,8 +101,25 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; } bool hasTo() const { return getDestination().size() || hasSubject(); } std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); } - bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); } - std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; } + bool hasSubject() const + { + if (getDestination().empty()) { + return getApplicationProperties().isSet(SUBJECT_KEY); + } else { + return deliveryProperties && deliveryProperties->hasRoutingKey(); + } + } + std::string getSubject() const + { + if (getDestination().empty()) { + //message was sent to default exchange, routing key is the queue name + return getApplicationProperties().getAsString(SUBJECT_KEY); + } else if (deliveryProperties) { + return deliveryProperties->getRoutingKey(); + } else { + return EMPTY; + } + } bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); } std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; } bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); } @@ -119,7 +139,7 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties bool hasReplyToGroupId() const { return false; } std::string getReplyToGroupId() const { return EMPTY; } - const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); } + const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); } Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t), messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()), deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>()) @@ -138,7 +158,6 @@ class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {} - boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer() { boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t = @@ -161,13 +180,38 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation transfer->getFrames().append(method); transfer->getFrames().append(header); - qpid::amqp::CharSequence body = message->getBody(); - content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size); - transfer->getFrames().append(content); - qpid::framing::MessageProperties* props = transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true); - props->setContentLength(body.size); + + if (message->isTypedBody()) { + qpid::types::Variant body = message->getTypedBody(); + std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData(); + if (body.getType() == qpid::types::VAR_MAP) { + qpid::amqp_0_10::MapCodec::encode(body.asMap(), data); + props->setContentType(qpid::amqp_0_10::MapCodec::contentType); + } else if (body.getType() == qpid::types::VAR_LIST) { + qpid::amqp_0_10::ListCodec::encode(body.asList(), data); + props->setContentType(qpid::amqp_0_10::ListCodec::contentType); + } else if (body.getType() == qpid::types::VAR_STRING) { + data = body.getString(); + if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) { + props->setContentType(TEXT_PLAIN); + } + } else { + qpid::types::Variant::List container; + container.push_back(body); + qpid::amqp_0_10::ListCodec::encode(container, data); + props->setContentType(qpid::amqp_0_10::ListCodec::contentType); + } + transfer->getFrames().append(content); + props->setContentLength(data.size()); + } else { + qpid::amqp::CharSequence body = message->getBody(); + content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size); + transfer->getFrames().append(content); + + props->setContentLength(body.size); + } qpid::amqp::MessageId mid = message->getMessageId(); qpid::framing::Uuid uuid; @@ -215,7 +259,10 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true); dp->setPriority(message->getPriority()); if (message->isPersistent()) dp->setDeliveryMode(2); - if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey()); + if (message->getRoutingKey().size()) { + dp->setRoutingKey(message->getRoutingKey()); + props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey()); + } return transfer.get(); } else { @@ -246,14 +293,40 @@ void Translation::write(OutgoingFromQueue& out) Properties_0_10 properties(*transfer); qpid::types::Variant::Map applicationProperties; qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties); - std::string content = transfer->getContent(); - size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); - std::vector<char> buffer(size); - qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); - encoder.writeProperties(properties); - encoder.writeApplicationProperties(applicationProperties); - if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); - out.write(&buffer[0], encoder.getPosition()); + if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) { + qpid::types::Variant::Map content; + qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); + size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ + size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE); + out.write(&buffer[0], encoder.getPosition()); + } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) { + qpid::types::Variant::List content; + qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); + size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ + size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE); + out.write(&buffer[0], encoder.getPosition()); + } else { + std::string content = transfer->getContent(); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); + std::vector<char> buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); + out.write(&buffer[0], encoder.getPosition()); + } } else { QPID_LOG(error, "Could not write message data in AMQP 1.0 format"); } diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py index 5c9cdf2f27..38a5b066d6 100644 --- a/qpid/python/qpid/tests/messaging/__init__.py +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -188,4 +188,17 @@ class Base(Test): return {"reconnect": self.reconnect(), "transport": self.transport()} +class VersionTest (Base): + def create_connection(self, version="amqp1.0", force=False): + opts = self.connection_options() + if force or not 'protocol' in opts: + opts['protocol'] = version; + return Connection.establish(self.broker, **opts) + + def setup_connection(self): + return self.create_connection() + + def setup_session(self): + return self.conn.session() + import address, endpoints, message diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py index 8088ddd95f..5ebbb4c651 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py @@ -19,6 +19,7 @@ # under the License. # +from general import * from legacy_exchanges import * from selector import * -from general import * +from translation import * diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py index 085acf8405..b3ff1df5a1 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py @@ -18,18 +18,12 @@ # from qpid.tests.messaging.implementation import * -from qpid.tests.messaging import Base +from qpid.tests.messaging import VersionTest -class GeneralTests (Base): +class GeneralTests (VersionTest): """ Miscellaneous tests for core AMQP 1.0 messaging behaviour. """ - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - def test_request_response(self): snd_request = self.ssn.sender("#") rcv_response = self.ssn.receiver("#") diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py b/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py index d2b8c643bd..024a1da689 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py @@ -18,19 +18,13 @@ # from qpid.tests.messaging.implementation import * -from qpid.tests.messaging import Base +from qpid.tests.messaging import VersionTest -class LegacyExchangeTests (Base): +class LegacyExchangeTests (VersionTest): """ Tests for the legacy (i.e. pre 1.0) AMQP exchanges and the filters defined for them and registered for AMQP 1.0. """ - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - def test_fanout(self): msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']] diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py index 696fbf01c3..ac2bbd8db3 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py @@ -18,19 +18,13 @@ # from qpid.tests.messaging.implementation import * -from qpid.tests.messaging import Base +from qpid.tests.messaging import VersionTest -class SelectorTests (Base): +class SelectorTests (VersionTest): """ Tests for the selector filter registered for AMQP 1.0 under the apache namespace. """ - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - def basic_selection_test(self, node): properties = [(1, 'red','dog'), (2, 'black', 'cat'), (3, 'red', 'squirrel'), (4, 'grey', 'squirrel')] msgs = [Message(content="%s.%s" % (colour, creature), properties={'sequence':sequence,'colour':colour}) for sequence, colour, creature in properties] diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py b/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py new file mode 100644 index 0000000000..a6394fb8c5 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.tests.messaging.implementation import * +from qpid.tests.messaging import VersionTest + +class TranslationTests (VersionTest): + """ + Testing translation of messages between 1.0 and 0-10 + """ + def send_receive_messages(self, msgs, send_version, receive_version, address): + rcon = self.create_connection(receive_version, True) + rcv = rcon.session().receiver(address) + + scon = self.create_connection(send_version, True) + snd = scon.session().sender(rcv.source) + + for m in msgs: snd.send(m) + + for expected in msgs: + msg = rcv.fetch() + assert msg.content == expected.content, (msg.content, expected.content) + assert msg.subject == expected.subject, (msg.subject, expected.subject) + self.ssn.acknowledge(msg) + scon.close() + rcon.close() + + def send_receive(self, send_version, receive_version, address): + self.send_receive_messages([Message(content=s, subject = s) for s in ['a','b','c','d']], send_version, receive_version, address) + + def send_receive_map(self, send_version, receive_version, address): + self.send_receive_messages([Message(content={'s':'abc','i':10})], send_version, receive_version, address) + + def send_receive_list(self, send_version, receive_version, address): + self.send_receive_messages([Message(content=['a', 1, 'c'])], send_version, receive_version, address) + + def test_translation_queue_1(self): + self.send_receive("amqp0-10", "amqp1.0", '#') + + def test_translation_queue_2(self): + self.send_receive("amqp1.0", "amqp0-10", '#') + + def test_translation_exchange_1(self): + self.send_receive("amqp0-10", "amqp1.0", 'amq.fanout') + + def test_translation_exchange_2(self): + self.send_receive("amqp1.0", "amqp0-10", 'amq.fanout') + + def test_send_receive_queue_1(self): + self.send_receive("amqp1.0", "amqp1.0", '#') + + def test_send_receive_queue_2(self): + self.send_receive("amqp0-10", "amqp0-10", '#') + + def test_send_receive_exchange_1(self): + self.send_receive("amqp1.0", "amqp1.0", 'amq.fanout') + + def test_send_receive_exchange_2(self): + self.send_receive("amqp0-10", "amqp0-10", 'amq.fanout') + + def test_translate_map_1(self): + self.send_receive_map("amqp0-10", "amqp1.0", '#') + + def test_translate_map_2(self): + self.send_receive_map("amqp1.0", "amqp0-10", '#') + + def test_translate_list_1(self): + self.send_receive_list("amqp0-10", "amqp1.0", '#') + + def test_translate_list_2(self): + self.send_receive_list("amqp1.0", "amqp0-10", '#') |
