summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-08 00:36:42 +0000
committerAlan Conway <aconway@apache.org>2008-10-08 00:36:42 +0000
commit1896a5d32c87555877edd1dafc1bd34e3fcf5683 (patch)
tree1b241ae2e857fa44170748075b07d40a973b18b4 /cpp/src
parent9d199b74aee76859480a7ee92d95c6db42028b43 (diff)
downloadqpid-python-1896a5d32c87555877edd1dafc1bd34e3fcf5683.tar.gz
rubygen/framing.0-10/constants.rb: create functions for all 3 exception subclasses.
client: added session suspend/resume functions, resume not implemented yet. ClientSessionTest: enabled compilation of suspend/resume tests with expected failures. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/Results.cpp4
-rw-r--r--cpp/src/qpid/client/Results.h1
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.cpp6
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.h8
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp30
-rw-r--r--cpp/src/qpid/client/SessionImpl.h10
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp64
8 files changed, 74 insertions, 50 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index cf5b09b255..a1575dd524 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -169,6 +169,7 @@ class Connection
friend class ConnectionAccess; ///<@internal
+ friend class SessionBase_0_10; ///<@internal
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Results.cpp b/cpp/src/qpid/client/Results.cpp
index c605af2878..7a2d0b6f71 100644
--- a/cpp/src/qpid/client/Results.cpp
+++ b/cpp/src/qpid/client/Results.cpp
@@ -31,6 +31,10 @@ namespace client {
Results::Results() {}
+Results::~Results() {
+ try { close(); } catch (const std::exception& e) { assert(0); }
+}
+
void Results::close()
{
for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
diff --git a/cpp/src/qpid/client/Results.h b/cpp/src/qpid/client/Results.h
index 667f35089c..4c49f6b05b 100644
--- a/cpp/src/qpid/client/Results.h
+++ b/cpp/src/qpid/client/Results.h
@@ -38,6 +38,7 @@ public:
typedef boost::shared_ptr<FutureResult> FutureResultPtr;
Results();
+ ~Results();
void completed(const framing::SequenceSet& set);
void received(const framing::SequenceNumber& id, const std::string& result);
FutureResultPtr listenForResult(const framing::SequenceNumber& point);
diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp
index 974acbfcf6..50cfb4b09d 100644
--- a/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -19,10 +19,12 @@
*
*/
#include "SessionBase_0_10.h"
+#include "Connection.h"
#include "qpid/framing/all_method_bodies.h"
namespace qpid {
namespace client {
+
using namespace framing;
SessionBase_0_10::SessionBase_0_10() {}
@@ -57,6 +59,10 @@ void SessionBase_0_10::sendCompletion()
impl->sendCompletion();
}
+void SessionBase_0_10::suspend() { impl->suspend(); }
+void SessionBase_0_10::resume(Connection c) { impl->resume(c.impl); }
+uint32_t SessionBase_0_10::timeout(uint32_t seconds) { return impl->setTimeout(seconds); }
+
SessionId SessionBase_0_10::getId() const { return impl->getId(); }
framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); }
diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h
index 8634164dd1..429f684424 100644
--- a/cpp/src/qpid/client/SessionBase_0_10.h
+++ b/cpp/src/qpid/client/SessionBase_0_10.h
@@ -38,6 +38,8 @@
namespace qpid {
namespace client {
+class Connection;
+
using std::string;
using framing::Content;
using framing::FieldTable;
@@ -91,6 +93,12 @@ class SessionBase_0_10 {
/** Set the timeout for this session. */
uint32_t timeout(uint32_t seconds);
+ /** Suspend the session - detach it from its connection */
+ void suspend();
+
+ /** Resume a suspended session with a new connection */
+ void resume(Connection);
+
Execution& getExecution();
void flush();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 3e1ea8b724..5c61248b5a 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -105,7 +105,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
waitFor(ATTACHED);
//TODO: timeout will not be set locally until get response to
//confirm, should we wait for that?
- proxy.requestTimeout(timeout);
+ setTimeout(timeout);
proxy.commandPoint(nextOut, 0);
} else {
throw Exception("Open already called for this session");
@@ -115,11 +115,7 @@ void SessionImpl::open(uint32_t timeout) // user thread
void SessionImpl::close() //user thread
{
Lock l(state);
- if (detachedLifetime) {
- proxy.requestTimeout(0);
- //should we wait for the timeout response?
- detachedLifetime = 0;
- }
+ if (detachedLifetime) setTimeout(0);
detach();
waitFor(DETACHED);
}
@@ -613,11 +609,8 @@ void SessionImpl::exception(uint16_t errorCode,
error = EXCEPTION;
code = errorCode;
text = description;
- if (detachedLifetime) {
- proxy.requestTimeout(0);
- //should we wait for the timeout response?
- detachedLifetime = 0;
- }
+ if (detachedLifetime)
+ setTimeout(0);
}
@@ -639,10 +632,10 @@ inline void SessionImpl::waitFor(State s) //call with lock held
void SessionImpl::check() const //call with lock held.
{
switch (error) {
- case OK: break;
- case CONNECTION_CLOSE: throw ConnectionException(code, text);
- case SESSION_DETACH: throw ChannelException(code, text);
- case EXCEPTION: throwExecutionException(code, text);
+ case OK: break;
+ case CONNECTION_CLOSE: throw ConnectionException(code, text);
+ case SESSION_DETACH: throw ChannelException(code, text);
+ case EXCEPTION: createSessionException(code, text).raise();
}
}
@@ -668,4 +661,11 @@ void SessionImpl::handleClosed()
results.close();
}
+uint32_t SessionImpl::setTimeout(uint32_t seconds) {
+ proxy.requestTimeout(seconds);
+ // FIXME aconway 2008-10-07: wait for timeout response from broker
+ // and use value retured by broker.
+ detachedLifetime = seconds;
+ return detachedLifetime;
+}
}}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index c63774a23a..989294b99e 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -95,6 +95,12 @@ public:
void connectionClosed(uint16_t code, const std::string& text);
void connectionBroke(uint16_t code, const std::string& text);
+ /** Set timeout in seconds, returns actual timeout allowed by broker */
+ uint32_t setTimeout(uint32_t requestedSeconds);
+
+ /** Get timeout in seconds. */
+ uint32_t getTimeout() const;
+
private:
enum ErrorType {
OK,
@@ -131,7 +137,6 @@ private:
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
void sendContent(const framing::MethodContent&);
void waitForCompletionImpl(const framing::SequenceNumber& id);
- void requestTimeout(uint32_t timeout);
void sendCompletionImpl();
@@ -140,7 +145,8 @@ private:
void attach(const std::string& name, bool force);
void attached(const std::string& name);
void detach(const std::string& name);
- void detached(const std::string& name, uint8_t detachCode);
+ void detached(const std::string& name, uint8_t detachCode);
+ void requestTimeout(uint32_t timeout);
void timeout(uint32_t timeout);
void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 0b46d39047..85497ace5d 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -162,41 +162,39 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData());
}
-// FIXME aconway 2008-05-26: Re-enable with final resume implementation.
-//
-// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
-// {
-// ClientSessionFixture fix;
-// fix.session.suspend(); // session has 0 timeout.
-// try {
-// fix.connection.resume(fix.session);
-// BOOST_FAIL("Expected InvalidArgumentException.");
-// } catch(const InternalErrorException&) {}
-// }
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1)
+{
+ ClientSessionFixture fix;
+ fix.session.suspend(); // session has 0 timeout.
+ try {
+ fix.connection.resume(fix.session);
+ BOOST_FAIL("Expected InvalidArgumentException.");
+ } catch(const InternalErrorException&) {}
+}
-// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1)
-// {
-// ClientSessionFixture fix;
-// fix.session =fix.session.timeout(60);
-// fix.session.suspend();
-// try {
-// fix.session.exchangeQuery(name="amq.fanout");
-// BOOST_FAIL("Expected session suspended exception");
-// } catch(const CommandInvalidException&) {}
-// }
+QPID_AUTO_TEST_CASE(testUseSuspendedError)
+{
+ ClientSessionFixture fix;
+ fix.session.timeout(60);
+ fix.session.suspend();
+ try {
+ fix.session.exchangeQuery(arg::exchange="amq.fanout");
+ BOOST_FAIL("Expected session suspended exception");
+ } catch(const NotAttachedException&) {}
+}
-// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
-// {
-// ClientSessionFixture fix;
-// fix.session.timeout(60);
-// fix.declareSubscribe();
-// fix.session.suspend();
-// // Make sure we are still subscribed after resume.
-// fix.connection.resume(fix.session);
-// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue"));
-// FrameSet::shared_ptr msg = fix.session.get();
-// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
-// }
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1)
+{
+ ClientSessionFixture fix;
+ fix.session.timeout(60);
+ fix.declareSubscribe();
+ fix.session.suspend();
+ // Make sure we are still subscribed after resume.
+ fix.connection.resume(fix.session);
+ fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue"));
+ FrameSet::shared_ptr msg = fix.session.get();
+ BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+}
QPID_AUTO_TEST_CASE(testSendToSelf) {