diff options
| author | Gordon Sim <gsim@apache.org> | 2014-06-25 14:47:51 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-06-25 14:47:51 +0000 |
| commit | a1a22c139817675a924dc3947a84cdda83e60e61 (patch) | |
| tree | cf7ebfc4293d5634b3127650f5787b57817b3c30 /qpid/cpp | |
| parent | 8a56b5981d2a988cd7145b3447a94dbc01c24857 (diff) | |
| download | qpid-python-a1a22c139817675a924dc3947a84cdda83e60e61.tar.gz | |
QPID-5828: more consistent behaviour os send() when disconnected
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1605429 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/include/qpid/messaging/exceptions.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 50 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/exceptions.cpp | 1 |
7 files changed, 52 insertions, 19 deletions
diff --git a/qpid/cpp/include/qpid/messaging/exceptions.h b/qpid/cpp/include/qpid/messaging/exceptions.h index 1ad8014940..f1d79b2535 100644 --- a/qpid/cpp/include/qpid/messaging/exceptions.h +++ b/qpid/cpp/include/qpid/messaging/exceptions.h @@ -155,6 +155,16 @@ struct QPID_MESSAGING_CLASS_EXTERN TargetCapacityExceeded : public SendError QPID_MESSAGING_EXTERN TargetCapacityExceeded(const std::string&); }; +/** + * Thrown to indicate that the locally configured sender capacity has + * been reached, and thus no further messages can be put on the replay + * buffer. + */ +struct QPID_MESSAGING_CLASS_EXTERN OutOfCapacity : public SendError +{ + QPID_MESSAGING_EXTERN OutOfCapacity(const std::string&); +}; + struct QPID_MESSAGING_CLASS_EXTERN SessionError : public MessagingException { QPID_MESSAGING_EXTERN SessionError(const std::string&); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 02a8a27a4e..ef8a82d2ea 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -392,5 +392,9 @@ bool ConnectionImpl::getAutoDecode() const { return !disableAutoDecode; } +bool ConnectionImpl::getAutoReconnect() const +{ + return autoReconnect; +} }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index ae839dc690..bf8a759107 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -57,6 +57,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl void reconnect(); std::string getUrl() const; bool getAutoDecode() const; + bool getAutoReconnect() const; private: typedef std::map<std::string, qpid::messaging::Session> Sessions; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 9d862e79a4..49d65226bc 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -30,8 +30,8 @@ namespace client { namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, - const qpid::messaging::Address& _address) : - parent(&_parent), name(_name), address(_address), state(UNRESOLVED), + const qpid::messaging::Address& _address, bool _autoReconnect) : + parent(&_parent), autoReconnect(_autoReconnect), name(_name), address(_address), state(UNRESOLVED), capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {} qpid::messaging::Address SenderImpl::getAddress() const @@ -100,21 +100,37 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) void SenderImpl::waitForCapacity() { sys::Mutex::ScopedLock l(lock); - //TODO: add option to throw exception rather than blocking? - if (!unreliable && capacity <= - (flushed ? checkPendingSends(false, l) : outgoing.size())) - { - //Initial implementation is very basic. As outgoing is - //currently only reduced on receiving completions and we are - //blocking anyway we may as well sync(). If successful that - //should clear all outstanding sends. - session.sync(); - checkPendingSends(false, l); - } - //flush periodically and check for conmpleted sends - if (++window > (capacity / 4)) {//TODO: make this configurable? - checkPendingSends(true, l); - window = 0; + try { + //TODO: add option to throw exception rather than blocking? + if (!unreliable && capacity <= + (flushed ? checkPendingSends(false, l) : outgoing.size())) + { + //Initial implementation is very basic. As outgoing is + //currently only reduced on receiving completions and we are + //blocking anyway we may as well sync(). If successful that + //should clear all outstanding sends. + session.sync(); + checkPendingSends(false, l); + } + //flush periodically and check for conmpleted sends + if (++window > (capacity / 4)) {//TODO: make this configurable? + checkPendingSends(true, l); + window = 0; + } + } catch (const qpid::TransportFailure&) { + //Disconnection prevents flushing or syncing. If we have any + //capacity we will return anyway (the subsequent attempt to + //send will fail, but message will be on replay buffer). + if (capacity > outgoing.size()) return; + //If we are out of capacity, but autoreconnect is on, then + //rethrow the transport failure to trigger reconnect which + //will have the effect of blocking until connected and + //capacity is freed up + if (autoReconnect) throw; + //Otherwise, in order to clearly signal to the application + //that the message was not pushed to replay buffer, throw an + //out of capacity error + throw qpid::messaging::OutOfCapacity(name); } } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h index ee250af2d4..3ed3b457ba 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -47,7 +47,7 @@ class SenderImpl : public qpid::messaging::SenderImpl enum State {UNRESOLVED, ACTIVE, CANCELLED}; SenderImpl(SessionImpl& parent, const std::string& name, - const qpid::messaging::Address& address); + const qpid::messaging::Address& address, bool autoReconnect); void send(const qpid::messaging::Message&, bool sync); void close(); void setCapacity(uint32_t); @@ -61,6 +61,7 @@ class SenderImpl : public qpid::messaging::SenderImpl private: mutable sys::Mutex lock; boost::intrusive_ptr<SessionImpl> parent; + const bool autoReconnect; const std::string name; const qpid::messaging::Address address; State state; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 057e752c90..d99407112b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -233,7 +233,7 @@ Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) ScopedLock l(lock); std::string name = address.getName(); getFreeKey(name, senders); - Sender sender(new SenderImpl(*this, name, address)); + Sender sender(new SenderImpl(*this, name, address, connection->getAutoReconnect())); getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver); senders[name] = sender; return sender; diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp index f7bf4aaee0..11b0eb33f7 100644 --- a/qpid/cpp/src/qpid/messaging/exceptions.cpp +++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp @@ -46,6 +46,7 @@ SenderError::SenderError(const std::string& msg) : LinkError(msg) {} SendError::SendError(const std::string& msg) : SenderError(msg) {} MessageRejected::MessageRejected(const std::string& msg) : SendError(msg) {} TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {} +OutOfCapacity::OutOfCapacity(const std::string& msg) : SendError(msg) {} SessionError::SessionError(const std::string& msg) : MessagingException(msg) {} SessionClosed::SessionClosed() : SessionError("Session Closed") {} |
