summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-06-25 14:47:51 +0000
committerGordon Sim <gsim@apache.org>2014-06-25 14:47:51 +0000
commita1a22c139817675a924dc3947a84cdda83e60e61 (patch)
treecf7ebfc4293d5634b3127650f5787b57817b3c30 /qpid/cpp/src
parent8a56b5981d2a988cd7145b3447a94dbc01c24857 (diff)
downloadqpid-python-a1a22c139817675a924dc3947a84cdda83e60e61.tar.gz
QPID-5828: more consistent behaviour os send() when disconnected
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1605429 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp50
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h3
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/exceptions.cpp1
6 files changed, 42 insertions, 19 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 02a8a27a4e..ef8a82d2ea 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -392,5 +392,9 @@ bool ConnectionImpl::getAutoDecode() const
{
return !disableAutoDecode;
}
+bool ConnectionImpl::getAutoReconnect() const
+{
+ return autoReconnect;
+}
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index ae839dc690..bf8a759107 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -57,6 +57,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
void reconnect();
std::string getUrl() const;
bool getAutoDecode() const;
+ bool getAutoReconnect() const;
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 9d862e79a4..49d65226bc 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -30,8 +30,8 @@ namespace client {
namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
- const qpid::messaging::Address& _address) :
- parent(&_parent), name(_name), address(_address), state(UNRESOLVED),
+ const qpid::messaging::Address& _address, bool _autoReconnect) :
+ parent(&_parent), autoReconnect(_autoReconnect), name(_name), address(_address), state(UNRESOLVED),
capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
qpid::messaging::Address SenderImpl::getAddress() const
@@ -100,21 +100,37 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
void SenderImpl::waitForCapacity()
{
sys::Mutex::ScopedLock l(lock);
- //TODO: add option to throw exception rather than blocking?
- if (!unreliable && capacity <=
- (flushed ? checkPendingSends(false, l) : outgoing.size()))
- {
- //Initial implementation is very basic. As outgoing is
- //currently only reduced on receiving completions and we are
- //blocking anyway we may as well sync(). If successful that
- //should clear all outstanding sends.
- session.sync();
- checkPendingSends(false, l);
- }
- //flush periodically and check for conmpleted sends
- if (++window > (capacity / 4)) {//TODO: make this configurable?
- checkPendingSends(true, l);
- window = 0;
+ try {
+ //TODO: add option to throw exception rather than blocking?
+ if (!unreliable && capacity <=
+ (flushed ? checkPendingSends(false, l) : outgoing.size()))
+ {
+ //Initial implementation is very basic. As outgoing is
+ //currently only reduced on receiving completions and we are
+ //blocking anyway we may as well sync(). If successful that
+ //should clear all outstanding sends.
+ session.sync();
+ checkPendingSends(false, l);
+ }
+ //flush periodically and check for conmpleted sends
+ if (++window > (capacity / 4)) {//TODO: make this configurable?
+ checkPendingSends(true, l);
+ window = 0;
+ }
+ } catch (const qpid::TransportFailure&) {
+ //Disconnection prevents flushing or syncing. If we have any
+ //capacity we will return anyway (the subsequent attempt to
+ //send will fail, but message will be on replay buffer).
+ if (capacity > outgoing.size()) return;
+ //If we are out of capacity, but autoreconnect is on, then
+ //rethrow the transport failure to trigger reconnect which
+ //will have the effect of blocking until connected and
+ //capacity is freed up
+ if (autoReconnect) throw;
+ //Otherwise, in order to clearly signal to the application
+ //that the message was not pushed to replay buffer, throw an
+ //out of capacity error
+ throw qpid::messaging::OutOfCapacity(name);
}
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index ee250af2d4..3ed3b457ba 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -47,7 +47,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
enum State {UNRESOLVED, ACTIVE, CANCELLED};
SenderImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address);
+ const qpid::messaging::Address& address, bool autoReconnect);
void send(const qpid::messaging::Message&, bool sync);
void close();
void setCapacity(uint32_t);
@@ -61,6 +61,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
private:
mutable sys::Mutex lock;
boost::intrusive_ptr<SessionImpl> parent;
+ const bool autoReconnect;
const std::string name;
const qpid::messaging::Address address;
State state;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 057e752c90..d99407112b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -233,7 +233,7 @@ Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, senders);
- Sender sender(new SenderImpl(*this, name, address));
+ Sender sender(new SenderImpl(*this, name, address, connection->getAutoReconnect()));
getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
senders[name] = sender;
return sender;
diff --git a/qpid/cpp/src/qpid/messaging/exceptions.cpp b/qpid/cpp/src/qpid/messaging/exceptions.cpp
index f7bf4aaee0..11b0eb33f7 100644
--- a/qpid/cpp/src/qpid/messaging/exceptions.cpp
+++ b/qpid/cpp/src/qpid/messaging/exceptions.cpp
@@ -46,6 +46,7 @@ SenderError::SenderError(const std::string& msg) : LinkError(msg) {}
SendError::SendError(const std::string& msg) : SenderError(msg) {}
MessageRejected::MessageRejected(const std::string& msg) : SendError(msg) {}
TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {}
+OutOfCapacity::OutOfCapacity(const std::string& msg) : SendError(msg) {}
SessionError::SessionError(const std::string& msg) : MessagingException(msg) {}
SessionClosed::SessionClosed() : SessionError("Session Closed") {}