diff options
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/SenderContext.cpp')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SenderContext.cpp | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp new file mode 100644 index 0000000000..96c4437b89 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -0,0 +1,363 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/SenderContext.h" +#include "qpid/messaging/amqp/EncodedMessage.h" +#include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/amqp/MessageEncoder.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} +#include <boost/shared_ptr.hpp> +#include <string.h> + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) + : name(n), + address(a), + sender(pn_sender(session, n.c_str())), capacity(1000) {} + +SenderContext::~SenderContext() +{ + pn_link_free(sender); +} + +void SenderContext::close() +{ + +} + +void SenderContext::setCapacity(uint32_t c) +{ + if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!"); + capacity = c; +} + +uint32_t SenderContext::getCapacity() +{ + return capacity; +} + +uint32_t SenderContext::getUnsettled() +{ + return processUnsettled(); +} + +const std::string& SenderContext::getName() const +{ + return name; +} + +const std::string& SenderContext::getTarget() const +{ + return address.getName(); +} + +SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) +{ + if (processUnsettled() < capacity && pn_link_credit(sender)) { + deliveries.push_back(Delivery(nextId++)); + Delivery& delivery = deliveries.back(); + delivery.encode(MessageImplAccess::get(message), address); + delivery.send(sender); + return &delivery; + } else { + return 0; + } +} + +uint32_t SenderContext::processUnsettled() +{ + //remove accepted messages from front of deque + while (!deliveries.empty() && deliveries.front().accepted()) { + deliveries.front().settle(); + deliveries.pop_front(); + } + return deliveries.size(); +} +namespace { +class HeaderAdapter : public qpid::amqp::MessageEncoder::Header +{ + public: + HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {} + virtual bool isDurable() const + { + return msg.isDurable(); + } + virtual uint8_t getPriority() const + { + return msg.getPriority(); + } + virtual bool hasTtl() const + { + return msg.getTtl(); + } + virtual uint32_t getTtl() const + { + return msg.getTtl(); + } + virtual bool isFirstAcquirer() const + { + return false; + } + virtual uint32_t getDeliveryCount() const + { + return msg.isRedelivered() ? 1 : 0; + } + private: + const qpid::messaging::MessageImpl& msg; +}; +const std::string EMPTY; + +class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties +{ + public: + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {} + bool hasMessageId() const + { + return getMessageId().size(); + } + std::string getMessageId() const + { + return msg.getMessageId(); + } + + bool hasUserId() const + { + return getUserId().size(); + } + + std::string getUserId() const + { + return msg.getUserId(); + } + + bool hasTo() const + { + return false;//not yet supported + } + + std::string getTo() const + { + return EMPTY;//not yet supported + } + + bool hasSubject() const + { + return subject.size() || getSubject().size(); + } + + std::string getSubject() const + { + return subject.size() ? subject : msg.getSubject(); + } + + bool hasReplyTo() const + { + return msg.getReplyTo(); + } + + std::string getReplyTo() const + { + return msg.getReplyTo().str(); + } + + bool hasCorrelationId() const + { + return getCorrelationId().size(); + } + + std::string getCorrelationId() const + { + return msg.getCorrelationId(); + } + + bool hasContentType() const + { + return getContentType().size(); + } + + std::string getContentType() const + { + return msg.getContentType(); + } + + bool hasContentEncoding() const + { + return false;//not yet supported + } + + std::string getContentEncoding() const + { + return EMPTY;//not yet supported + } + + bool hasAbsoluteExpiryTime() const + { + return false;//not yet supported + } + + int64_t getAbsoluteExpiryTime() const + { + return 0;//not yet supported + } + + bool hasCreationTime() const + { + return false;//not yet supported + } + + int64_t getCreationTime() const + { + return 0;//not yet supported + } + + bool hasGroupId() const + { + return false;//not yet supported + } + + std::string getGroupId() const + { + return EMPTY;//not yet supported + } + + bool hasGroupSequence() const + { + return false;//not yet supported + } + + uint32_t getGroupSequence() const + { + return 0;//not yet supported + } + + bool hasReplyToGroupId() const + { + return false;//not yet supported + } + + std::string getReplyToGroupId() const + { + return EMPTY;//not yet supported + } + private: + const qpid::messaging::MessageImpl& msg; + const std::string subject; +}; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + +} + +SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {} + +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); + + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered + //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? + if (original->hasHeaderChanged(msg)) { + //since as yet have no annotations, just write the revised header then the rest of the message as received + encoded.resize(16/*max header size*/ + original->getBareMessage().size); + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + HeaderAdapter header(msg); + encoder.writeHeader(header); + ::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size); + } else { + //since as yet have no annotations, if the header hasn't + //changed and we still have the original bare message, can + //send the entire content as is + encoded.resize(original->getSize()); + ::memcpy(encoded.getData(), original->getData(), original->getSize()); + } + } else { + HeaderAdapter header(msg); + PropertiesAdapter properties(msg, address.getSubject()); + //compute size: + encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes())); + QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") + qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize()); + //write header: + encoder.writeHeader(header); + //write delivery-annotations, write message-annotations (none yet supported) + //write properties + encoder.writeProperties(properties); + //write application-properties + encoder.writeApplicationProperties(msg.getHeaders()); + //write body + if (msg.getBytes().size()) encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported + if (encoder.getPosition() < encoded.getSize()) { + QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition()); + encoded.trim(encoder.getPosition()); + } + //write footer (no annotations yet supported) + } +} +void SenderContext::Delivery::send(pn_link_t* sender) +{ + pn_delivery_tag_t tag; + tag.size = sizeof(id); + tag.bytes = reinterpret_cast<const char*>(&id); + token = pn_delivery(sender, tag); + pn_link_send(sender, encoded.getData(), encoded.getSize()); + pn_link_advance(sender); +} + +bool SenderContext::Delivery::accepted() +{ + return pn_delivery_remote_state(token) == PN_ACCEPTED; +} +void SenderContext::Delivery::settle() +{ + pn_delivery_settle(token); +} +void SenderContext::configure() const +{ + configure(pn_link_target(sender)); +} +void SenderContext::configure(pn_terminus_t* target) const +{ + pn_terminus_set_address(target, address.getName().c_str()); + //dynamic create: + AddressHelper helper(address); + if (helper.createEnabled(AddressHelper::FOR_SENDER)) { + helper.setNodeProperties(target); + } +} + +bool SenderContext::settled() +{ + return processUnsettled() == 0; +} + +}}} // namespace qpid::messaging::amqp |