diff options
| author | Gordon Sim <gsim@apache.org> | 2008-02-15 08:26:00 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-02-15 08:26:00 +0000 |
| commit | 4351730550bc48c4237de4e616f8e420e084c081 (patch) | |
| tree | f4d0101055d375a91e3838e0ee31651ff6e72122 /cpp | |
| parent | 4a5dd2bc8257c7a370088a179acc760b143c62a8 (diff) | |
| download | qpid-python-4351730550bc48c4237de4e616f8e420e084c081.tar.gz | |
* updated c++ build to work with recent gentools changes
* add null exchange.bound impl to SessionHandlerImpl (to reflect change to spec file)
* pass AMQDataBlocks rather than AMQFrames to OutputHandler
* allow client to pass messages frames to io layer in one go (via FrameList) if it will fit in a single buffer
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
22 files changed, 249 insertions, 35 deletions
diff --git a/cpp/gen/Makefile.am b/cpp/gen/Makefile.am index 3398f01330..61ec38bbaa 100644 --- a/cpp/gen/Makefile.am +++ b/cpp/gen/Makefile.am @@ -32,16 +32,21 @@ DISTCLEANFILES = $(BUILT_SOURCES) timestamp gen-src.mk # if CAN_GENERATE_CODE + gentools_dir = $(srcdir)/../../gentools spec_dir = $(srcdir)/../../specs spec = $(spec_dir)/amqp.0-8.xml gentools_srcdir = $(gentools_dir)/src/org/apache/qpid/gentools +gentools_libs = $(gentools_dir)/lib/velocity-1.4.jar:$(gentools_dir)/lib/velocity-dep-1.4.jar $(BUILT_SOURCES) timestamp: $(spec) $(java_sources) $(cxx_templates) rm -f $(generated_sources) - cd $(gentools_srcdir) && rm -f *.class && $(JAVAC) *.java - $(JAVA) -cp $(gentools_dir)/src org.apache.qpid.gentools.Main \ + cd $(gentools_srcdir) && rm -f *.class + $(JAVAC) -cp $(gentools_libs) -sourcepath $(gentools_srcdir) -d $(gentools_dir)/src $(gentools_srcdir)/*.java + $(JAVA) -cp $(gentools_dir)/src:$(gentools_libs) org.apache.qpid.gentools.Main \ -c -o . -t $(gentools_dir)/templ.cpp $(spec) + echo $(JAVA) -cp $(gentools_dir)/src:$(gentools_libs) org.apache.qpid.gentools.Main \ + -c -o . -t $(gentools_dir)/templ.cpp $(spec) >> debug touch timestamp gen-src.mk: timestamp diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 7e631b4505..92aa0ff456 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -177,7 +177,12 @@ class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait); - + + virtual void bound( u_int16_t /*channel*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*queue*/ ) {} + virtual ~ExchangeHandlerImpl(){} }; diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index a97d79dcf9..92f8ae63ca 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -23,6 +23,7 @@ #include <ClientMessage.h> #include <QpidError.h> #include <MethodBodyInstances.h> +#include <framing/FrameList.h> using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; @@ -219,19 +220,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + std::auto_ptr<FrameList> message(new FrameList()); + + message->add(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(version, id, body)); + message->add(new AMQFrame(version, id, body)); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - out->send(new AMQFrame(version, id, new AMQContentBody(data))); + if(data_length + message->size() < frag_size){ + message->add(new AMQFrame(version, id, new AMQContentBody(data))); + } else if(data_length < frag_size){ + out->send(message.release()); + out->send(new AMQFrame(version, id, new AMQContentBody(data))); }else{ + out->send(message.release()); u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { @@ -244,6 +251,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } } } + if (message.get()) out->send(message.release()); } void Channel::commit(){ diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index a99360b840..291ebb2107 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -78,7 +78,7 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -void Connector::send(AMQFrame* frame){ +void Connector::send(AMQDataBlock* frame){ writeBlock(frame); if(debug) std::cout << "SENT: " << *frame << std::endl; delete frame; diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 44112369dc..49dcf6bb7a 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -85,7 +85,7 @@ namespace client { virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void send(qpid::framing::AMQDataBlock* frame); virtual void setReadTimeout(u_int16_t timeout); virtual void setWriteTimeout(u_int16_t timeout); }; diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index b2e2de2cf1..b584e93ca5 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -76,6 +76,7 @@ libqpidcommon_la_SOURCES = \ $(platform_src) \ $(framing)/AMQBody.cpp \ $(framing)/AMQContentBody.cpp \ + $(framing)/AMQDataBlock.cpp \ $(framing)/AMQFrame.cpp \ $(framing)/AMQHeaderBody.cpp \ $(framing)/AMQHeartbeatBody.cpp \ @@ -84,6 +85,7 @@ libqpidcommon_la_SOURCES = \ $(framing)/BodyHandler.cpp \ $(framing)/Buffer.cpp \ $(framing)/FieldTable.cpp \ + $(framing)/FrameList.cpp \ $(framing)/FramingContent.cpp \ $(framing)/InitiationHandler.cpp \ $(framing)/ProtocolInitiation.cpp \ @@ -114,6 +116,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/BodyHandler.h \ $(framing)/Buffer.h \ $(framing)/FieldTable.h \ + $(framing)/FrameList.h \ $(framing)/FramingContent.h \ $(framing)/HeaderProperties.h \ $(framing)/InitiationHandler.h \ diff --git a/cpp/lib/common/framing/AMQDataBlock.cpp b/cpp/lib/common/framing/AMQDataBlock.cpp new file mode 100644 index 0000000000..9c4d6bee63 --- /dev/null +++ b/cpp/lib/common/framing/AMQDataBlock.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AMQDataBlock.h" + + +namespace qpid { +namespace framing { + +std::ostream& operator<<(std::ostream& out, const AMQDataBlock& b) +{ + b.print(out); + return out; +} + +}} diff --git a/cpp/lib/common/framing/AMQDataBlock.h b/cpp/lib/common/framing/AMQDataBlock.h index ac91c52164..36de2beea5 100644 --- a/cpp/lib/common/framing/AMQDataBlock.h +++ b/cpp/lib/common/framing/AMQDataBlock.h @@ -33,10 +33,12 @@ public: virtual void encode(Buffer& buffer) = 0; virtual bool decode(Buffer& buffer) = 0; virtual u_int32_t size() const = 0; + virtual void print(std::ostream& out) const = 0; + + friend std::ostream& operator<<(std::ostream& out, const AMQDataBlock& block); }; } } - #endif diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 6fa5b9ae51..0530dc805c 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -119,14 +119,18 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t bufSize) body->decode(buffer, bufSize); } -std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +void AMQFrame::print(std::ostream& out) const { - out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) + out << "Frame[channel=" << channel << "; "; + if (body.get() == 0) out << "empty"; else - out << *t.body; + out << *body; out << "]"; +} +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t) +{ + t.print(out); return out; } diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index d3c769087a..21642f112a 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -61,6 +61,8 @@ namespace qpid { u_int32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); + void print(std::ostream& out) const; + friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body); }; diff --git a/cpp/lib/common/framing/FrameList.cpp b/cpp/lib/common/framing/FrameList.cpp new file mode 100644 index 0000000000..f188347101 --- /dev/null +++ b/cpp/lib/common/framing/FrameList.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FrameList.h" +#include "Exception.h" + +namespace qpid { +namespace framing { + +FrameList::~FrameList() +{ + for (Frames::iterator i = frames.begin(); i != frames.end(); i++) { + delete (*i); + } +} + +void FrameList::encode(Buffer& buffer) +{ + for (Frames::iterator i = frames.begin(); i != frames.end(); i++) { + (*i)->encode(buffer); + } +} + +bool FrameList::decode(Buffer&) +{ + throw Exception("FrameList::decode() not valid!"); +} + +u_int32_t FrameList::size() const +{ + uint32_t s(0); + for (Frames::const_iterator i = frames.begin(); i != frames.end(); i++) { + s += (*i)->size(); + } + return s; +} + +void FrameList::print(std::ostream& out) const +{ + out << "Frames: "; + for (Frames::const_iterator i = frames.begin(); i != frames.end(); i++) { + (*i)->print(out); + out << "; "; + } +} + +void FrameList::add(AMQFrame* f) +{ + frames.push_back(f); +} + +}} diff --git a/cpp/lib/common/framing/FrameList.h b/cpp/lib/common/framing/FrameList.h new file mode 100644 index 0000000000..59dc385e46 --- /dev/null +++ b/cpp/lib/common/framing/FrameList.h @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Buffer.h" +#include "AMQDataBlock.h" +#include "AMQFrame.h" + +#include <list> + +#ifndef _FrameList_ +#define _FrameList_ + +namespace qpid { +namespace framing { + +class FrameList : public AMQDataBlock +{ + typedef std::list<AMQFrame*> Frames; + Frames frames; +public: + virtual ~FrameList(); + void encode(Buffer& buffer); + bool decode(Buffer& buffer); + u_int32_t size() const; + void add(AMQFrame* f); + void print(std::ostream& out) const; +}; + +} +} + + +#endif diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h index 2e01e34df2..16dc519738 100644 --- a/cpp/lib/common/framing/OutputHandler.h +++ b/cpp/lib/common/framing/OutputHandler.h @@ -22,6 +22,7 @@ * */ #include <boost/noncopyable.hpp> +#include <AMQDataBlock.h> #include <AMQFrame.h> namespace qpid { @@ -30,7 +31,7 @@ namespace framing { class OutputHandler : private boost::noncopyable { public: virtual ~OutputHandler() {} - virtual void send(AMQFrame* frame) = 0; + virtual void send(AMQDataBlock* frame) = 0; }; }} diff --git a/cpp/lib/common/framing/ProtocolInitiation.cpp b/cpp/lib/common/framing/ProtocolInitiation.cpp index 471f736a7d..360178df5a 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.cpp +++ b/cpp/lib/common/framing/ProtocolInitiation.cpp @@ -19,6 +19,7 @@ * */ #include <ProtocolInitiation.h> +#include <iostream> qpid::framing::ProtocolInitiation::ProtocolInitiation(){} @@ -55,4 +56,9 @@ bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ } } +void qpid::framing::ProtocolInitiation::print(std::ostream& out) const +{ + out << "AMQP(" << getMajor() << "-" << getMinor() << ")"; +} + //TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date diff --git a/cpp/lib/common/framing/ProtocolInitiation.h b/cpp/lib/common/framing/ProtocolInitiation.h index 003c3bba81..03e53c75cb 100644 --- a/cpp/lib/common/framing/ProtocolInitiation.h +++ b/cpp/lib/common/framing/ProtocolInitiation.h @@ -45,6 +45,7 @@ public: inline u_int8_t getMajor() const { return version.getMajor(); } inline u_int8_t getMinor() const { return version.getMinor(); } inline const ProtocolVersion& getVersion() const { return version; } + void print(std::ostream& out) const; }; } diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp index 8a7ce18136..dfe27050c4 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -96,7 +96,7 @@ void LFSessionContext::write(){ if(!framesToWrite.empty()){ out.clear(); bool encoded(false); - AMQFrame* frame = framesToWrite.front(); + AMQDataBlock* frame = framesToWrite.front(); while(frame && out.available() >= frame->size()){ encoded = true; frame->encode(out); @@ -120,7 +120,7 @@ void LFSessionContext::write(){ } } -void LFSessionContext::send(AMQFrame* frame){ +void LFSessionContext::send(AMQDataBlock* frame){ Mutex::ScopedLock l(writeLock); if(!closing){ framesToWrite.push(frame); @@ -173,9 +173,9 @@ void LFSessionContext::init(SessionHandler* _handler){ processor->add(&fd); } -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ +void LFSessionContext::log(const std::string& desc, AMQDataBlock* const block){ Mutex::ScopedLock l(logLock); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + std::cout << desc << " [" << &socket << "]: " << *block << std::endl; } Mutex LFSessionContext::logLock; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index eeb8279d9a..7862055735 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -54,7 +54,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext apr_pollfd_t fd; - std::queue<qpid::framing::AMQFrame*> framesToWrite; + std::queue<qpid::framing::AMQDataBlock*> framesToWrite; qpid::sys::Mutex writeLock; bool processing; @@ -62,7 +62,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext static qpid::sys::Mutex logLock; void log(const std::string& desc, - qpid::framing::AMQFrame* const frame); + qpid::framing::AMQDataBlock* const block); public: @@ -70,7 +70,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext LFProcessor* const processor, bool debug = false); virtual ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void send(qpid::framing::AMQDataBlock* frame); virtual void close(); void read(); void write(); diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index cc0a90bad9..637e971077 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -39,8 +39,8 @@ using std::queue; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index bd638dae66..ca27e80515 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -33,8 +33,9 @@ using namespace qpid::framing; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 2075a6dd3a..8d0ad65a6a 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -35,8 +35,8 @@ using namespace qpid::framing; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index bcf3ad8064..ee864a3883 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -30,8 +30,8 @@ using namespace qpid::framing; struct DummyHandler : OutputHandler{ std::vector<AMQFrame*> frames; - virtual void send(AMQFrame* frame){ - frames.push_back(frame); + virtual void send(AMQDataBlock* block){ + frames.push_back(dynamic_cast<AMQFrame*>(block)); } }; diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp index a5cc64d1e4..55bf6234d1 100644 --- a/cpp/tests/client_test.cpp +++ b/cpp/tests/client_test.cpp @@ -35,6 +35,7 @@ #include <MessageListener.h> #include <sys/Monitor.h> #include <FieldTable.h> +#include <cstdlib> using namespace qpid::client; using namespace qpid::sys; @@ -52,12 +53,28 @@ public: inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} inline virtual void received(Message& msg){ - std::cout << "Received message " << msg.getData() << std::endl; + std::cout << "Received message " << msg.getData().substr(0, 5) << "..." << std::endl; monitor->notify(); } }; -int main(int argc, char**) +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + +std::string generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; + } + data += chars.substr(0, size % chars.length()); + return data; +} + + +int main(int argc, char** argv) { try{ //Use a custom exchange @@ -109,10 +126,17 @@ int main(int argc, char**) //Now we create and publish a message to our exchange with a //routing key that will cause it to be routed to our queue Message msg; - string data("MyMessage"); - msg.setData(data); + uint size = 0; + if (argc > 1) { + size = atoi(argv[1]); + } + if (size) { + msg.setData(generateData(size)); + } else { + msg.setData("MyMessage"); + } channel.publish(msg, exchange, "MyTopic"); - std::cout << "Published message: " << data << std::endl; + std::cout << "Published message: " << msg.getData().substr(0, 5) << "..." << std::endl; { Monitor::ScopedLock l(monitor); |
