summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/amqp/Translation.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/amqp/Translation.cpp')
-rw-r--r--cpp/src/qpid/broker/amqp/Translation.cpp241
1 files changed, 241 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/amqp/Translation.cpp b/cpp/src/qpid/broker/amqp/Translation.cpp
new file mode 100644
index 0000000000..ca2094b965
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Translation.h"
+#include "qpid/broker/amqp/Outgoing.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/amqp/Decoder.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/MessageEncoder.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/Variant.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+
+const std::string EMPTY;
+const std::string FORWARD_SLASH("/");
+
+std::string translate(const qpid::framing::ReplyTo r)
+{
+ if (r.hasExchange()) {
+ if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
+ else return r.getExchange();
+ } else return r.getRoutingKey();
+}
+std::string translate(const qpid::amqp::CharSequence& chars)
+{
+ if (chars.data && chars.size) return std::string(chars.data, chars.size);
+ else return EMPTY;
+}
+bool setMessageId(qpid::framing::MessageProperties& m, const qpid::amqp::CharSequence& chars)
+{
+ if (chars.data && chars.size) {
+ if (chars.size == 16) {
+ m.setMessageId(qpid::framing::Uuid(chars.data));
+ return true;
+ } else {
+ std::istringstream in(translate(chars));
+ qpid::framing::Uuid uuid;
+ in >> uuid;
+ if (!in.fail()) {
+ m.setMessageId(uuid);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
+{
+ public:
+ bool hasMessageId() const { return messageProperties && messageProperties->hasMessageId(); }
+ std::string getMessageId() const { return messageProperties ? messageProperties->getMessageId().str() : EMPTY; }
+ bool hasUserId() const { return messageProperties && messageProperties->hasUserId(); }
+ std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
+ bool hasTo() const { return getDestination().size() || hasSubject(); }
+ std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); }
+ bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
+ std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
+ bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
+ std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
+ bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
+ std::string getCorrelationId() const { return messageProperties ? messageProperties->getCorrelationId() : EMPTY; }
+ bool hasContentType() const { return messageProperties && messageProperties->hasContentType(); }
+ std::string getContentType() const { return messageProperties ? messageProperties->getContentType() : EMPTY; }
+ bool hasContentEncoding() const { return messageProperties && messageProperties->hasContentEncoding(); }
+ std::string getContentEncoding() const { return messageProperties ? messageProperties->getContentEncoding() : EMPTY; }
+ bool hasAbsoluteExpiryTime() const { return deliveryProperties && deliveryProperties->hasExpiration(); }
+ int64_t getAbsoluteExpiryTime() const { return deliveryProperties ? deliveryProperties->getExpiration() : 0; }
+ bool hasCreationTime() const { return false; }
+ int64_t getCreationTime() const { return 0; }
+ bool hasGroupId() const {return false; }
+ std::string getGroupId() const { return EMPTY; }
+ bool hasGroupSequence() const { return false; }
+ uint32_t getGroupSequence() const { return 0; }
+ bool hasReplyToGroupId() const { return false; }
+ std::string getReplyToGroupId() const { return EMPTY; }
+
+ const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
+ Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
+ messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
+ deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
+ {}
+ private:
+ const qpid::broker::amqp_0_10::MessageTransfer& transfer;
+ const qpid::framing::MessageProperties* messageProperties;
+ const qpid::framing::DeliveryProperties* deliveryProperties;
+
+ std::string getDestination() const
+ {
+ return transfer.getMethod<qpid::framing::MessageTransferBody>()->getDestination();
+ }
+};
+}
+
+Translation::Translation(const qpid::broker::Message& m) : original(m) {}
+
+
+boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
+{
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()));
+ if (t) {
+ return t;//no translation required
+ } else {
+ const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
+ if (message) {
+ //translate 1.0 message into 0-10
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0)));
+ qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody()));
+ qpid::framing::AMQFrame content((qpid::framing::AMQContentBody()));
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
+
+ qpid::amqp::CharSequence body = message->getBody();
+ content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
+ transfer->getFrames().append(content);
+
+ qpid::framing::MessageProperties* props =
+ transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
+ props->setContentLength(body.size);
+
+ qpid::amqp::MessageId mid = message->getMessageId();
+ qpid::framing::Uuid uuid;
+ switch (mid.type) {
+ case qpid::amqp::MessageId::UUID:
+ case qpid::amqp::MessageId::BYTES:
+ if (mid.value.bytes.size == 0) break;
+ if (setMessageId(*props, mid.value.bytes)) break;
+ case qpid::amqp::MessageId::ULONG:
+ QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID");
+ break;
+ }
+
+ qpid::amqp::MessageId cid = message->getCorrelationId();
+ switch (cid.type) {
+ case qpid::amqp::MessageId::UUID:
+ assert(cid.value.bytes.size = 16);
+ props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
+ break;
+ case qpid::amqp::MessageId::BYTES:
+ if (cid.value.bytes.size) {
+ props->setCorrelationId(translate(cid.value.bytes));
+ }
+ break;
+ case qpid::amqp::MessageId::ULONG:
+ props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
+ break;
+ }
+ // TODO: ReplyTo - there is no way to reliably determine
+ // the type of the node from just its name, unless we
+ // query the brokers registries
+
+ if (message->getContentType()) props->setContentType(translate(message->getContentType()));
+ if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
+ props->setUserId(message->getUserId());
+ // TODO: FieldTable applicationHeaders;
+ qpid::amqp::CharSequence ap = message->getApplicationProperties();
+ if (ap) {
+ qpid::amqp::Decoder d(ap.data, ap.size);
+ qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
+ }
+
+ qpid::framing::DeliveryProperties* dp =
+ transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
+ dp->setPriority(message->getPriority());
+ if (message->isPersistent()) dp->setDeliveryMode(2);
+ if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+
+ return transfer.get();
+ } else {
+ throw qpid::Exception("Could not write message data in AMQP 0-10 format");
+ }
+ }
+}
+
+void Translation::write(Outgoing& out)
+{
+ const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
+ if (message) {
+ //write annotations
+ //TODO: merge in any newly added annotations
+ qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations();
+ qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations();
+ if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size);
+ if (messageAnnotations.size) out.write(messageAnnotations.data, messageAnnotations.size);
+ //write bare message
+ qpid::amqp::CharSequence bareMessage = message->getBareMessage();
+ if (bareMessage.size) out.write(bareMessage.data, bareMessage.size);
+ //write footer:
+ qpid::amqp::CharSequence footer = message->getFooter();
+ if (footer.size) out.write(footer.data, footer.size);
+ } else {
+ const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding());
+ if (transfer) {
+ Properties_0_10 properties(*transfer);
+ qpid::types::Variant::Map applicationProperties;
+ qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
+ std::string content = transfer->getContent();
+ size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
+ std::vector<char> buffer(size);
+ qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+ encoder.writeProperties(properties);
+ encoder.writeApplicationProperties(applicationProperties);
+ encoder.writeBinary(content, &qpid::amqp::message::DATA);
+ out.write(&buffer[0], encoder.getPosition());
+ } else {
+ QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
+ }
+ }
+}
+
+}}} // namespace qpid::broker::amqp