diff options
author | Alan Conway <aconway@apache.org> | 2008-10-08 00:36:42 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-08 00:36:42 +0000 |
commit | 1896a5d32c87555877edd1dafc1bd34e3fcf5683 (patch) | |
tree | 1b241ae2e857fa44170748075b07d40a973b18b4 /cpp/src | |
parent | 9d199b74aee76859480a7ee92d95c6db42028b43 (diff) | |
download | qpid-python-1896a5d32c87555877edd1dafc1bd34e3fcf5683.tar.gz |
rubygen/framing.0-10/constants.rb: create functions for all 3 exception subclasses.
client: added session suspend/resume functions, resume not implemented yet.
ClientSessionTest: enabled compilation of suspend/resume tests with expected failures.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/Results.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Results.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 64 |
8 files changed, 74 insertions, 50 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index cf5b09b255..a1575dd524 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -169,6 +169,7 @@ class Connection friend class ConnectionAccess; ///<@internal + friend class SessionBase_0_10; ///<@internal }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Results.cpp b/cpp/src/qpid/client/Results.cpp index c605af2878..7a2d0b6f71 100644 --- a/cpp/src/qpid/client/Results.cpp +++ b/cpp/src/qpid/client/Results.cpp @@ -31,6 +31,10 @@ namespace client { Results::Results() {} +Results::~Results() { + try { close(); } catch (const std::exception& e) { assert(0); } +} + void Results::close() { for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { diff --git a/cpp/src/qpid/client/Results.h b/cpp/src/qpid/client/Results.h index 667f35089c..4c49f6b05b 100644 --- a/cpp/src/qpid/client/Results.h +++ b/cpp/src/qpid/client/Results.h @@ -38,6 +38,7 @@ public: typedef boost::shared_ptr<FutureResult> FutureResultPtr; Results(); + ~Results(); void completed(const framing::SequenceSet& set); void received(const framing::SequenceNumber& id, const std::string& result); FutureResultPtr listenForResult(const framing::SequenceNumber& point); diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp index 974acbfcf6..50cfb4b09d 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -19,10 +19,12 @@ * */ #include "SessionBase_0_10.h" +#include "Connection.h" #include "qpid/framing/all_method_bodies.h" namespace qpid { namespace client { + using namespace framing; SessionBase_0_10::SessionBase_0_10() {} @@ -57,6 +59,10 @@ void SessionBase_0_10::sendCompletion() impl->sendCompletion(); } +void SessionBase_0_10::suspend() { impl->suspend(); } +void SessionBase_0_10::resume(Connection c) { impl->resume(c.impl); } +uint32_t SessionBase_0_10::timeout(uint32_t seconds) { return impl->setTimeout(seconds); } + SessionId SessionBase_0_10::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase_0_10::get() { return impl->get(); } diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h index 8634164dd1..429f684424 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.h +++ b/cpp/src/qpid/client/SessionBase_0_10.h @@ -38,6 +38,8 @@ namespace qpid { namespace client { +class Connection; + using std::string; using framing::Content; using framing::FieldTable; @@ -91,6 +93,12 @@ class SessionBase_0_10 { /** Set the timeout for this session. */ uint32_t timeout(uint32_t seconds); + /** Suspend the session - detach it from its connection */ + void suspend(); + + /** Resume a suspended session with a new connection */ + void resume(Connection); + Execution& getExecution(); void flush(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 3e1ea8b724..5c61248b5a 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -105,7 +105,7 @@ void SessionImpl::open(uint32_t timeout) // user thread waitFor(ATTACHED); //TODO: timeout will not be set locally until get response to //confirm, should we wait for that? - proxy.requestTimeout(timeout); + setTimeout(timeout); proxy.commandPoint(nextOut, 0); } else { throw Exception("Open already called for this session"); @@ -115,11 +115,7 @@ void SessionImpl::open(uint32_t timeout) // user thread void SessionImpl::close() //user thread { Lock l(state); - if (detachedLifetime) { - proxy.requestTimeout(0); - //should we wait for the timeout response? - detachedLifetime = 0; - } + if (detachedLifetime) setTimeout(0); detach(); waitFor(DETACHED); } @@ -613,11 +609,8 @@ void SessionImpl::exception(uint16_t errorCode, error = EXCEPTION; code = errorCode; text = description; - if (detachedLifetime) { - proxy.requestTimeout(0); - //should we wait for the timeout response? - detachedLifetime = 0; - } + if (detachedLifetime) + setTimeout(0); } @@ -639,10 +632,10 @@ inline void SessionImpl::waitFor(State s) //call with lock held void SessionImpl::check() const //call with lock held. { switch (error) { - case OK: break; - case CONNECTION_CLOSE: throw ConnectionException(code, text); - case SESSION_DETACH: throw ChannelException(code, text); - case EXCEPTION: throwExecutionException(code, text); + case OK: break; + case CONNECTION_CLOSE: throw ConnectionException(code, text); + case SESSION_DETACH: throw ChannelException(code, text); + case EXCEPTION: createSessionException(code, text).raise(); } } @@ -668,4 +661,11 @@ void SessionImpl::handleClosed() results.close(); } +uint32_t SessionImpl::setTimeout(uint32_t seconds) { + proxy.requestTimeout(seconds); + // FIXME aconway 2008-10-07: wait for timeout response from broker + // and use value retured by broker. + detachedLifetime = seconds; + return detachedLifetime; +} }} diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index c63774a23a..989294b99e 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -95,6 +95,12 @@ public: void connectionClosed(uint16_t code, const std::string& text); void connectionBroke(uint16_t code, const std::string& text); + /** Set timeout in seconds, returns actual timeout allowed by broker */ + uint32_t setTimeout(uint32_t requestedSeconds); + + /** Get timeout in seconds. */ + uint32_t getTimeout() const; + private: enum ErrorType { OK, @@ -131,7 +137,6 @@ private: Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); void sendContent(const framing::MethodContent&); void waitForCompletionImpl(const framing::SequenceNumber& id); - void requestTimeout(uint32_t timeout); void sendCompletionImpl(); @@ -140,7 +145,8 @@ private: void attach(const std::string& name, bool force); void attached(const std::string& name); void detach(const std::string& name); - void detached(const std::string& name, uint8_t detachCode); + void detached(const std::string& name, uint8_t detachCode); + void requestTimeout(uint32_t timeout); void timeout(uint32_t timeout); void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); void expected(const framing::SequenceSet& commands, const framing::Array& fragments); diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 0b46d39047..85497ace5d 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -162,41 +162,39 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); } -// FIXME aconway 2008-05-26: Re-enable with final resume implementation. -// -// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) -// { -// ClientSessionFixture fix; -// fix.session.suspend(); // session has 0 timeout. -// try { -// fix.connection.resume(fix.session); -// BOOST_FAIL("Expected InvalidArgumentException."); -// } catch(const InternalErrorException&) {} -// } +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspend0Timeout, 1) +{ + ClientSessionFixture fix; + fix.session.suspend(); // session has 0 timeout. + try { + fix.connection.resume(fix.session); + BOOST_FAIL("Expected InvalidArgumentException."); + } catch(const InternalErrorException&) {} +} -// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUseSuspendedError, 1) -// { -// ClientSessionFixture fix; -// fix.session =fix.session.timeout(60); -// fix.session.suspend(); -// try { -// fix.session.exchangeQuery(name="amq.fanout"); -// BOOST_FAIL("Expected session suspended exception"); -// } catch(const CommandInvalidException&) {} -// } +QPID_AUTO_TEST_CASE(testUseSuspendedError) +{ + ClientSessionFixture fix; + fix.session.timeout(60); + fix.session.suspend(); + try { + fix.session.exchangeQuery(arg::exchange="amq.fanout"); + BOOST_FAIL("Expected session suspended exception"); + } catch(const NotAttachedException&) {} +} -// QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) -// { -// ClientSessionFixture fix; -// fix.session.timeout(60); -// fix.declareSubscribe(); -// fix.session.suspend(); -// // Make sure we are still subscribed after resume. -// fix.connection.resume(fix.session); -// fix.session.messageTransfer(content=TransferContent("my-message", "my-queue")); -// FrameSet::shared_ptr msg = fix.session.get(); -// BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); -// } +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) +{ + ClientSessionFixture fix; + fix.session.timeout(60); + fix.declareSubscribe(); + fix.session.suspend(); + // Make sure we are still subscribed after resume. + fix.connection.resume(fix.session); + fix.session.messageTransfer(arg::content=TransferContent("my-message", "my-queue")); + FrameSet::shared_ptr msg = fix.session.get(); + BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); +} QPID_AUTO_TEST_CASE(testSendToSelf) { |