diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-19 21:33:27 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-19 21:33:27 +0000 |
| commit | e861284318186f8d9cd64a7ddcc28b8d20b98721 (patch) | |
| tree | 6dac612d65297dc5f104350884fc01385c69ecda /cpp/lib/common | |
| parent | 226be67c91b25a5ba8efdd9ba88566033ec97718 (diff) | |
| download | qpid-python-e861284318186f8d9cd64a7ddcc28b8d20b98721.tar.gz | |
Last big refactoring for 0-9 framing. Still need additional tests &
debugging but the overall structure is all in place.
* configure.ac: Added -Wno_virtual_overload warning
* ChannelTest.cpp, MessageBuilderTest.cpp: Fixed virtual overload warnings.
* ChannelAdapter.cpp: Common base for client/broker adapters.
Creates invocation context, handles request/resposne IDs.
* CppGenerator.java:
- Proxies send methods using MethodContext.
* Various .h files: removed unnecessary #includes, added to requred .cpp files.
* ConnectionContext: renamed from SessionContext.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497963 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common')
19 files changed, 234 insertions, 68 deletions
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index 813c49135e..eefff79d6f 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -65,6 +65,7 @@ libqpidcommon_la_SOURCES = \ $(framing)/AMQMethodBody.cpp \ $(framing)/BasicHeaderProperties.cpp \ $(framing)/BodyHandler.cpp \ + $(framing)/ChannelAdapter.cpp \ $(framing)/Buffer.cpp \ $(framing)/FieldTable.cpp \ $(framing)/FramingContent.cpp \ @@ -96,6 +97,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/AMQMethodBody.h \ $(framing)/BasicHeaderProperties.h \ $(framing)/BodyHandler.h \ + $(framing)/ChannelAdapter.h \ $(framing)/Buffer.h \ $(framing)/FieldTable.h \ $(framing)/FramingContent.h \ @@ -119,7 +121,7 @@ nobase_pkginclude_HEADERS = \ sys/Monitor.h \ sys/Mutex.h \ sys/Runnable.h \ - sys/SessionContext.h \ + sys/ConnectionOutputHandler.h \ sys/ConnectionInputHandler.h \ sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 8ac5199c45..c6837af668 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -28,15 +28,15 @@ using namespace qpid::framing; AMQP_MethodVersionMap AMQFrame::versionMap; -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version): +AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version): version(_version) {} -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : +AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : version(_version), channel(_channel), body(_body) {} -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : +AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : version(_version), channel(_channel), body(_body) {} diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index c27de70e5a..f3c3232d56 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -41,9 +41,9 @@ namespace framing { class AMQFrame : virtual public AMQDataBlock { public: - AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion); - AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); - AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); + AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion); + AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); + AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); virtual ~AMQFrame(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index 0c77a1c64a..73b729b945 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <AMQFrame.h> #include <AMQMethodBody.h> #include <QpidError.h> #include "AMQP_MethodVersionMap.h" @@ -59,5 +60,8 @@ void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) { decodeContent(buffer); } +void AMQMethodBody::send(const MethodContext& context) { + context.out->send(new AMQFrame(version, context.channelId, this)); +} }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index 9f859046f8..ff09ee60e1 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -53,6 +53,13 @@ class AMQMethodBody : public AMQBody virtual void invoke(AMQP_ServerOperations&, const MethodContext&); bool match(AMQMethodBody* other) const; + + /** + * Wrap this method in a frame and send using the current context. + * Note the frame takes ownership of the body, it will be deleted. + */ + virtual void send(const MethodContext& context); + protected: static u_int32_t baseSize() { return 4; } diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index c64b1325d6..dffbb62aca 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -16,6 +16,7 @@ * */ +#include "AMQFrame.h" #include "AMQResponseBody.h" #include "AMQP_MethodVersionMap.h" @@ -61,5 +62,11 @@ void AMQResponseBody::printPrefix(std::ostream& out) const { << ",batch=" << data.batchOffset << "): "; } +void AMQResponseBody::send(const MethodContext& context) { + setRequestId(context.requestId); + assert(context.out); + context.out->send( + new AMQFrame(version, context.channelId, this)); +} }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index 6528613a12..2520a481f2 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -65,6 +65,11 @@ class AMQResponseBody : public AMQMethodBody ResponseId getResponseId() { return data.responseId; } RequestId getRequestId() { return data.requestId; } BatchOffset getBatchOffset() { return data.batchOffset; } + void setResponseId(ResponseId id) { data.responseId = id; } + void setRequestId(RequestId id) { data.requestId = id; } + void setBatchOffset(BatchOffset id) { data.batchOffset = id; } + + virtual void send(const MethodContext& context); protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp index 72ba82468f..5dd0c0c23d 100644 --- a/cpp/lib/common/framing/BodyHandler.cpp +++ b/cpp/lib/common/framing/BodyHandler.cpp @@ -58,22 +58,3 @@ void BodyHandler::handleBody(shared_ptr<AMQBody> body) { } } -void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) { - responder.received(request->getData()); - handleMethod(request); -} - -void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) { - handleMethod(response); - requester.processed(response->getData()); -} - -void BodyHandler::assertChannelZero(u_int16_t id) { - if (id != 0) - throw ConnectionException(504, "Invalid channel id, not 0"); -} - -void BodyHandler::assertChannelNonZero(u_int16_t id) { - if (id == 0) - throw ConnectionException(504, "Invalid channel id 0"); -} diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h index c9c74e2b3f..cb3f0997b0 100644 --- a/cpp/lib/common/framing/BodyHandler.h +++ b/cpp/lib/common/framing/BodyHandler.h @@ -38,35 +38,21 @@ class AMQContentBody; class AMQHeartbeatBody; /** - * Base class for client and broker channel handlers. - * - * Handles request/response id management common to client and broker. - * Derived classes provide remaining client/broker specific handling. + * Interface to handle incoming frame bodies. + * Derived classes provide logic for each frame type. */ class BodyHandler { public: virtual ~BodyHandler(); - - void handleBody(boost::shared_ptr<AMQBody> body); + virtual void handleBody(boost::shared_ptr<AMQBody> body); protected: - virtual void handleRequest(boost::shared_ptr<AMQRequestBody>); - virtual void handleResponse(boost::shared_ptr<AMQResponseBody>); - + virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0; + virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0; virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0; - - protected: - /** Throw protocol exception if this is not channel 0. */ - static void assertChannelZero(u_int16_t id); - /** Throw protocol exception if this is channel 0. */ - static void assertChannelNonZero(u_int16_t id); - - private: - Requester requester; - Responder responder; }; }} diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp new file mode 100644 index 0000000000..cf6fea1455 --- /dev/null +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ChannelAdapter.h" +#include "AMQFrame.h" + +namespace qpid { +namespace framing { + +void ChannelAdapter::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + switch (body->type()) { + case REQUEST_BODY: { + AMQRequestBody::shared_ptr request = + boost::shared_polymorphic_downcast<AMQRequestBody>(body); + requester.sending(request->getData()); + break; + } + case RESPONSE_BODY: { + AMQResponseBody::shared_ptr response = + boost::shared_polymorphic_downcast<AMQResponseBody>(body); + responder.sending(response->getData()); + break; + } + } + out.send(frame); +} + +void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { + responder.received(request->getData()); + MethodContext context(id, &out, request->getRequestId()); + handleMethodInContext(request, context); +} + +void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { + handleMethod(response); + requester.processed(response->getData()); +} + +void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { + MethodContext context(id, this); + handleMethodInContext(method, context); +} + +void ChannelAdapter::assertChannelZero(u_int16_t id) { + if (id != 0) + throw ConnectionException(504, "Invalid channel id, not 0"); +} + +void ChannelAdapter::assertChannelNonZero(u_int16_t id) { + if (id == 0) + throw ConnectionException(504, "Invalid channel id 0"); +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h new file mode 100644 index 0000000000..0652cc41bb --- /dev/null +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -0,0 +1,90 @@ +#ifndef _ChannelAdapter_ +#define _ChannelAdapter_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +#include "BodyHandler.h" +#include "Requester.h" +#include "Responder.h" +#include "OutputHandler.h" + +namespace qpid { +namespace framing { + +class MethodContext; + +/** + * Base class for client and broker channel adapters. + * + * As BodyHandler: + * - Creates MethodContext and dispatches methods+context to derived class. + * - Updates request/response ID data. + * + * As OutputHandler: + * - Updates request/resposne ID data. + * + */ +class ChannelAdapter : public BodyHandler, public OutputHandler { + public: + /** + *@param output Processed frames are forwarded to this handler. + */ + ChannelAdapter(OutputHandler& output, ChannelId channelId) + : id(channelId), out(output) {} + + ChannelId getId() { return id; } + + /** + * Do request/response-id processing and then forward to + * handler provided to constructor. Response frames should + * have their request-id set before calling send. + */ + void send(AMQFrame* frame); + + void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); + void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); + void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>); + + protected: + /** Throw protocol exception if this is not channel 0. */ + static void assertChannelZero(u_int16_t id); + /** Throw protocol exception if this is channel 0. */ + static void assertChannelNonZero(u_int16_t id); + + virtual void handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context) = 0; + + ChannelId id; + + private: + Requester requester; + Responder responder; + OutputHandler& out; +}; + +}} + + +#endif diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 13d5f658ca..1aa4be8f1e 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -19,6 +19,9 @@ * */ +#include "OutputHandler.h" +#include "ProtocolVersion.h" + namespace qpid { namespace framing { @@ -26,11 +29,14 @@ class BodyHandler; /** * Invocation context for an AMQP method. + * Some of the context information is related to the channel, some + * to the specific invocation - e.g. requestId. + * * All generated proxy and handler functions take a MethodContext parameter. * - * The user calling on a broker proxy can simply pass an integer - * channel ID, it will implicitly be converted to an appropriate context. - * + * The user does not need to create MethodContext objects explicitly, + * the constructor will implicitly create one from a channel ID. + * * Other context members are for internal use. */ struct MethodContext @@ -39,13 +45,21 @@ struct MethodContext * Passing a integer channel-id in place of a MethodContext * will automatically construct the MethodContext. */ - MethodContext(ChannelId channel, RequestId request=0) - : channelId(channel), requestId(request) {} + MethodContext( + ChannelId channel, OutputHandler* output=0, RequestId request=0) + : channelId(channel), out(output), requestId(request){} + + /** \internal Channel on which the method is sent. */ + const ChannelId channelId; + + /** Output handler for responses in this context */ + OutputHandler* out; + + /** \internal If we are in the context of processing an incoming request, + * this is the ID. Otherwise it is 0. + */ + const RequestId requestId; - /** Channel on which the method is sent. */ - ChannelId channelId; - /** \internal For proxy response: the original request or 0. */ - RequestId requestId; }; }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h index 2e01e34df2..9ffd4227d8 100644 --- a/cpp/lib/common/framing/OutputHandler.h +++ b/cpp/lib/common/framing/OutputHandler.h @@ -22,10 +22,10 @@ * */ #include <boost/noncopyable.hpp> -#include <AMQFrame.h> namespace qpid { namespace framing { +class AMQFrame; class OutputHandler : private boost::noncopyable { public: diff --git a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h index 5bb5e17704..af7d411928 100644 --- a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h +++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h @@ -26,7 +26,7 @@ namespace qpid { namespace sys { -class SessionContext; +class ConnectionOutputHandler; class ConnectionInputHandler; /** @@ -36,7 +36,7 @@ class ConnectionInputHandler; class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0; + virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0; virtual ~ConnectionInputHandlerFactory(){} }; diff --git a/cpp/lib/common/sys/SessionContext.h b/cpp/lib/common/sys/ConnectionOutputHandler.h index 671e00774f..91849e1dfb 100644 --- a/cpp/lib/common/sys/SessionContext.h +++ b/cpp/lib/common/sys/ConnectionOutputHandler.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionContext_ -#define _SessionContext_ +#ifndef _ConnectionOutputHandler_ +#define _ConnectionOutputHandler_ #include <OutputHandler.h> @@ -29,7 +29,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class SessionContext : public virtual qpid::framing::OutputHandler +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler { public: virtual void close() = 0; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index 8cf50b87ba..81cfc0efda 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -30,7 +30,7 @@ #include <AMQFrame.h> #include <Buffer.h> #include <sys/Monitor.h> -#include <sys/SessionContext.h> +#include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include "APRSocket.h" @@ -40,7 +40,7 @@ namespace qpid { namespace sys { -class LFSessionContext : public virtual qpid::sys::SessionContext +class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler { const bool debug; APRSocket socket; diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 787d12d6d1..548fbd1881 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -26,7 +26,7 @@ #include <boost/bind.hpp> #include <boost/scoped_ptr.hpp> -#include <sys/SessionContext.h> +#include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include <sys/ConnectionInputHandlerFactory.h> #include <sys/Acceptor.h> diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h index 1504e92651..da7b6dca27 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.h +++ b/cpp/lib/common/sys/posix/EventChannelConnection.h @@ -23,7 +23,7 @@ #include "EventChannelThreads.h" #include "sys/Monitor.h" -#include "sys/SessionContext.h" +#include "sys/ConnectionOutputHandler.h" #include "sys/ConnectionInputHandler.h" #include "sys/AtomicCount.h" #include "framing/AMQFrame.h" @@ -34,13 +34,13 @@ namespace sys { class ConnectionInputHandlerFactory; /** - * Implements SessionContext and delegates to a ConnectionInputHandler + * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, * by default same as readDescriptor */ -class EventChannelConnection : public SessionContext { +class EventChannelConnection : public ConnectionOutputHandler { public: EventChannelConnection( EventChannelThreads::shared_ptr threads, @@ -50,7 +50,7 @@ class EventChannelConnection : public SessionContext { bool isTrace = false ); - // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr + // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr virtual void send(qpid::framing::AMQFrame* frame) { send(std::auto_ptr<qpid::framing::AMQFrame>(frame)); } diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h index 5afbe8f5a8..57b5a5757c 100644 --- a/cpp/lib/common/sys/posix/check.h +++ b/cpp/lib/common/sys/posix/check.h @@ -45,7 +45,7 @@ class PosixError : public qpid::QpidError Exception* clone() const throw() { return new PosixError(*this); } - void throwSelf() { throw *this; } + void throwSelf() const { throw *this; } private: int errNo; |
