diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-17 14:08:14 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-17 14:08:14 +0000 |
| commit | 16a49ba6ef283a5093780d28efaaa8483fc9010d (patch) | |
| tree | 427908f0242a05033385a1dcd1a0af908bca44ea /qpid/cpp | |
| parent | fca7b2ac23c76c402457ab605639ba4b16a5e3f1 (diff) | |
| download | qpid-python-16a49ba6ef283a5093780d28efaaa8483fc9010d.tar.gz | |
QPID-2935: sync with latest trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
61 files changed, 2324 insertions, 438 deletions
diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL index 6628552ce5..6483d7de4e 100644 --- a/qpid/cpp/INSTALL +++ b/qpid/cpp/INSTALL @@ -67,6 +67,10 @@ Optional SSL support requires: * nss <http://www.mozilla.org/projects/security/pki/nss/> * nspr <http://www.mozilla.org/projects/nspr/> +Optional binding support for ruby requires: +* ruby and ruby devel <http://www.ruby-lang.org/en/> +* swig <http://www.swig.org/> + Qpid has been built using the GNU C++ compiler: * gcc <http://gcc.gnu.org/> (3.4.6) @@ -124,6 +128,9 @@ For the XML Exchange, include: # yum install xqilla-devel xerces-c-devel +Optional ruby binding support include: + # yum install ruby ruby-devel swig + Follow the manual installation instruction below for any packages not available through your distributions packaging tool. diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am index 9c3bd615d6..84207d43c4 100644 --- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am +++ b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am @@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include AM_CPPFLAGS = $(INCLUDE) -noinst_PROGRAMS=agent list_agents +noinst_PROGRAMS=agent list_agents print_events agent_SOURCES=agent.cpp agent_LDADD=$(top_builddir)/src/libqmf2.la @@ -29,3 +29,5 @@ agent_LDADD=$(top_builddir)/src/libqmf2.la list_agents_SOURCES=list_agents.cpp list_agents_LDADD=$(top_builddir)/src/libqmf2.la +print_events_SOURCES=print_events.cpp +print_events_LDADD=$(top_builddir)/src/libqmf2.la diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp b/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp new file mode 100644 index 0000000000..9883a19962 --- /dev/null +++ b/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Duration.h> +#include <qmf/ConsoleSession.h> +#include <qmf/ConsoleEvent.h> +#include <qmf/Data.h> +#include <qpid/types/Variant.h> +#include <string> +#include <iostream> + +using namespace std; +using namespace qmf; +using qpid::types::Variant; +using qpid::messaging::Duration; + +int main(int argc, char** argv) +{ + string url("localhost"); + string connectionOptions; + string sessionOptions; + + if (argc > 1) + url = argv[1]; + if (argc > 2) + connectionOptions = argv[2]; + if (argc > 3) + sessionOptions = argv[3]; + + qpid::messaging::Connection connection(url, connectionOptions); + connection.open(); + + ConsoleSession session(connection, sessionOptions); + session.open(); + + while (true) { + ConsoleEvent event; + if (session.nextEvent(event)) { + if (event.getType() == CONSOLE_EVENT) { + const Data& data(event.getData(0)); + cout << "Event: timestamp=" << event.getTimestamp() << " severity=" << + event.getSeverity() << " content=" << data.getProperties() << endl; + } + } + } +} + diff --git a/qpid/cpp/bindings/qmf2/examples/python/agent.py b/qpid/cpp/bindings/qmf2/examples/python/agent.py index 66b7dbdc58..b24890f531 100755 --- a/qpid/cpp/bindings/qmf2/examples/python/agent.py +++ b/qpid/cpp/bindings/qmf2/examples/python/agent.py @@ -69,32 +69,41 @@ class ExampleAgent(AgentHandler): if addr == self.controlAddr: self.control.methodCount += 1 - if methodName == "stop": - self.session.methodSuccess(handle) - self.cancel() - - elif methodName == "echo": - handle.addReturnArgument("sequence", args["sequence"]) - handle.addReturnArgument("map", args["map"]) - self.session.methodSuccess(handle) - - elif methodName == "fail": - if args['useString']: - self.session.raiseException(handle, args['stringVal']) - else: - ex = Data(self.sch_exception) - ex.whatHappened = "It Failed" - ex.howBad = 75 - ex.details = args['details'] - self.session.raiseException(handle, ex) - - elif methodName == "create_child": - name = args['name'] - child = Data(self.sch_child) - child.name = name - addr = self.session.addData(child, name) - handle.addReturnArgument("childAddr", addr.asMap()) - self.session.methodSuccess(handle) + try: + if methodName == "stop": + self.session.methodSuccess(handle) + self.cancel() + + elif methodName == "echo": + handle.addReturnArgument("sequence", args["sequence"]) + handle.addReturnArgument("map", args["map"]) + self.session.methodSuccess(handle) + + elif methodName == "event": + ev = Data(self.sch_event) + ev.text = args['text'] + self.session.raiseEvent(ev, args['severity']) + self.session.methodSuccess(handle) + + elif methodName == "fail": + if args['useString']: + self.session.raiseException(handle, args['stringVal']) + else: + ex = Data(self.sch_exception) + ex.whatHappened = "It Failed" + ex.howBad = 75 + ex.details = args['details'] + self.session.raiseException(handle, ex) + + elif methodName == "create_child": + name = args['name'] + child = Data(self.sch_child) + child.name = name + addr = self.session.addData(child, name) + handle.addReturnArgument("childAddr", addr.asMap()) + self.session.methodSuccess(handle) + except BaseException, e: + self.session.raiseException(handle, "%r" % e) def setupSchema(self): @@ -128,6 +137,11 @@ class ExampleAgent(AgentHandler): echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT)) self.sch_control.addMethod(echoMethod) + eventMethod = SchemaMethod("event", desc="Raise an Event") + eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN)) + eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN)) + self.sch_control.addMethod(eventMethod) + failMethod = SchemaMethod("fail", desc="Expected to Fail") failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN)) failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN)) @@ -146,11 +160,18 @@ class ExampleAgent(AgentHandler): self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING)) ## + ## Declare the event class + ## + self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event") + self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING)) + + ## ## Register our schemata with the agent session. ## self.session.registerSchema(self.sch_exception) self.session.registerSchema(self.sch_control) self.session.registerSchema(self.sch_child) + self.session.registerSchema(self.sch_event) def populateData(self): diff --git a/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb b/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb index 712e5007be..41de7e5abe 100644 --- a/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb +++ b/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb @@ -27,7 +27,7 @@ class FindAgents < Qmf2::ConsoleHandler end def agent_added(agent) - puts "Agent Added: #{agent.to_s}" + puts "Agent Added: #{agent.name}" end def agent_deleted(agent, reason) diff --git a/qpid/cpp/bindings/qmf2/python/qmf2.py b/qpid/cpp/bindings/qmf2/python/qmf2.py index 61a5453f8e..9f2d8556f4 100644 --- a/qpid/cpp/bindings/qmf2/python/qmf2.py +++ b/qpid/cpp/bindings/qmf2/python/qmf2.py @@ -165,7 +165,7 @@ class ConsoleHandler(Thread): reason = 'filter' if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED: reason = 'aged' - self.agentDeleted(Agent(event.getAgent(), reason)) + self.agentDeleted(Agent(event.getAgent()), reason) elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART: self.agentRestarted(Agent(event.getAgent())) @@ -373,6 +373,16 @@ class AgentSession(object): else: self._impl.raiseException(handle, data) + def raiseEvent(self, data, severity=None): + """ + """ + if not severity: + self._impl.raiseEvent(data._impl) + else: + if (severity.__class__ != int and severity.__class__ != long) or severity < 0 or severity > 7: + raise Exception("Severity must be an int between 0..7") + self._impl.raiseEvent(data._impl, severity); + #=================================================================================================== # AGENT PROXY diff --git a/qpid/cpp/bindings/qmf2/ruby/qmf2.rb b/qpid/cpp/bindings/qmf2/ruby/qmf2.rb index 6d1741ebc0..c14ecba4e1 100644 --- a/qpid/cpp/bindings/qmf2/ruby/qmf2.rb +++ b/qpid/cpp/bindings/qmf2/ruby/qmf2.rb @@ -433,6 +433,14 @@ module Qmf2 def del_data(addr) @impl.del_data(addr.impl) end + + def raise_event(data, severity=nil) + if !severity + @impl.raiseEvent(data.impl) + else + @impl.raiseEvent(data.impl, severity) + end + end end ##============================================================================== diff --git a/qpid/cpp/bindings/qpid/Makefile.am b/qpid/cpp/bindings/qpid/Makefile.am index 07b51e6c64..ca9eda0c73 100644 --- a/qpid/cpp/bindings/qpid/Makefile.am +++ b/qpid/cpp/bindings/qpid/Makefile.am @@ -17,10 +17,11 @@ # under the License. # +SUBDIRS = dotnet + if HAVE_SWIG EXTRA_DIST = qpid.i -SUBDIRS = if HAVE_RUBY_DEVEL SUBDIRS += ruby diff --git a/qpid/cpp/bindings/qpid/dotnet/Makefile.am b/qpid/cpp/bindings/qpid/dotnet/Makefile.am new file mode 100644 index 0000000000..f2b106bcb2 --- /dev/null +++ b/qpid/cpp/bindings/qpid/dotnet/Makefile.am @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +EXTRA_DIST = winsdk_sources/winsdk_dotnet_examples.sln \ + winsdk_sources/examples/csharp.direct.receiver/csharp.direct.receiver.csproj \ + winsdk_sources/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj \ + winsdk_sources/examples/csharp.example.helloworld/csharp.example.helloworld.csproj \ + winsdk_sources/examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj \ + winsdk_sources/examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj \ + winsdk_sources/examples/csharp.example.server/csharp.example.server.csproj \ + winsdk_sources/examples/csharp.example.spout/csharp.example.spout.csproj \ + winsdk_sources/examples/csharp.example.drain/csharp.example.drain.csproj \ + winsdk_sources/examples/csharp.map.sender/csharp.map.sender.csproj \ + winsdk_sources/examples/csharp.map.receiver/csharp.map.receiver.csproj \ + winsdk_sources/examples/csharp.example.client/csharp.example.client.csproj \ + winsdk_sources/examples/csharp.direct.sender/csharp.direct.sender.csproj \ + examples/csharp.direct.receiver/csharp.direct.receiver.cs \ + examples/csharp.direct.receiver/csharp.direct.receiver.csproj \ + examples/csharp.direct.receiver/Properties/AssemblyInfo.cs \ + examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs \ + examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj \ + examples/csharp.map.callback.receiver/Properties/AssemblyInfo.cs \ + examples/powershell.example.helloworld/powershell.example.helloworld.ps1 \ + examples/csharp.example.helloworld/csharp.example.helloworld.cs \ + examples/csharp.example.helloworld/csharp.example.helloworld.csproj \ + examples/csharp.example.helloworld/Properties/AssemblyInfo.cs \ + examples/csharp.example.declare_queues/csharp.example.declare_queues.cs \ + examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj \ + examples/csharp.example.declare_queues/Properties/AssemblyInfo.cs \ + examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj \ + examples/csharp.map.callback.sender/Properties/AssemblyInfo.cs \ + examples/csharp.map.callback.sender/csharp.map.callback.sender.cs \ + examples/csharp.example.server/csharp.example.server.csproj \ + examples/csharp.example.server/csharp.example.server.cs \ + examples/csharp.example.server/Properties/AssemblyInfo.cs \ + examples/csharp.example.spout/csharp.example.spout.csproj \ + examples/csharp.example.spout/Options.cs \ + examples/csharp.example.spout/csharp.example.spout.cs \ + examples/csharp.example.spout/Properties/AssemblyInfo.cs \ + examples/csharp.example.drain/csharp.example.drain.cs \ + examples/csharp.example.drain/csharp.example.drain.csproj \ + examples/csharp.example.drain/Options.cs \ + examples/csharp.example.drain/Properties/AssemblyInfo.cs \ + examples/csharp.map.sender/csharp.map.sender.csproj \ + examples/csharp.map.sender/csharp.map.sender.cs \ + examples/csharp.map.sender/Properties/AssemblyInfo.cs \ + examples/visualbasic.example.client/visualbasic.example.client.vbproj \ + examples/visualbasic.example.client/MyProject/Resources.resx \ + examples/visualbasic.example.client/MyProject/Application.myapp \ + examples/visualbasic.example.client/MyProject/Settings.settings \ + examples/visualbasic.example.client/MyProject/Settings.Designer.vb \ + examples/visualbasic.example.client/MyProject/AssemblyInfo.vb \ + examples/visualbasic.example.client/MyProject/Application.Designer.vb \ + examples/visualbasic.example.client/MyProject/Resources.Designer.vb \ + examples/visualbasic.example.client/visualbasic.example.client.vb \ + examples/csharp.map.receiver/csharp.map.receiver.csproj \ + examples/csharp.map.receiver/csharp.map.receiver.cs \ + examples/csharp.map.receiver/Properties/AssemblyInfo.cs \ + examples/csharp.example.client/csharp.example.client.cs \ + examples/csharp.example.client/Properties/AssemblyInfo.cs \ + examples/csharp.example.client/csharp.example.client.csproj \ + examples/csharp.direct.sender/csharp.direct.sender.csproj \ + examples/csharp.direct.sender/csharp.direct.sender.cs \ + examples/csharp.direct.sender/Properties/AssemblyInfo.cs \ + configure-windows.ps1 \ + ReadMe.txt \ + org.apache.qpid.messaging.sln \ + test/messaging.test/messaging.test.address.cs \ + test/messaging.test/messaging.test.duration.cs \ + test/messaging.test/messaging.test.cs \ + test/messaging.test/messaging.test.message.cs \ + test/messaging.test/messaging.test.csproj \ + test/messaging.test/Properties/AssemblyInfo.cs \ + test/messaging.test/messaging.test.connection.cs \ + src/org.apache.qpid.messaging.vcproj \ + src/Message.cpp \ + src/Connection.cpp \ + src/TypeTranslator.h \ + src/AssemblyInfo.cpp \ + src/FailoverUpdates.h \ + src/sessionreceiver/sessionreceiver.cs \ + src/sessionreceiver/Properties/sessionreceiver-AssemblyInfo-template.cs \ + src/sessionreceiver/qpid.snk \ + src/sessionreceiver/org.apache.qpid.messaging.sessionreceiver.csproj \ + src/Sender.h \ + src/TypeTranslator.cpp \ + src/Receiver.h \ + src/Address.h \ + src/Sender.cpp \ + src/QpidTypeCheck.h \ + src/resource1.h \ + src/Duration.h \ + src/Session.h \ + src/Message.h \ + src/ReadMe.txt \ + src/Receiver.cpp \ + src/Address.cpp \ + src/app.rc \ + src/Session.cpp \ + src/org.apache.qpid.messaging.template.rc \ + src/qpid.snk \ + src/Connection.h \ + src/QpidException.h \ + src/QpidMarshal.h \ + src/FailoverUpdates.cpp \ + src/Duration.cpp \ + ../../../src/windows/resources/qpid-icon.ico \ + ../../../src/windows/resources/template-resource.rc \ + ../../../src/windows/resources/version-resource.h diff --git a/qpid/cpp/build-aux/.gitignore b/qpid/cpp/build-aux/.gitignore new file mode 100644 index 0000000000..42725ceff3 --- /dev/null +++ b/qpid/cpp/build-aux/.gitignore @@ -0,0 +1 @@ +/py-compile diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index ee1bade1c9..ea1a1b49ea 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -16,7 +16,7 @@ AC_INIT([qpidc], [dev@qpid.apache.org]) AC_CONFIG_AUX_DIR([build-aux]) -AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects]) +AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects tar-ustar]) # Minimum Autoconf version required. AC_PREREQ(2.59) @@ -538,6 +538,7 @@ AC_CONFIG_FILES([ bindings/qpid/ruby/Makefile bindings/qpid/python/Makefile bindings/qpid/perl/Makefile + bindings/qpid/dotnet/Makefile bindings/qmf/Makefile bindings/qmf/ruby/Makefile bindings/qmf/python/Makefile diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index eb4bb72aad..4b5a1b1c2c 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -313,6 +313,10 @@ if (NOT Boost_FILESYSTEM_LIBRARY) set(Boost_FILESYSTEM_LIBRARY boost_filesystem) endif (NOT Boost_FILESYSTEM_LIBRARY) +if (NOT Boost_SYSTEM_LIBRARY) + set(Boost_SYSTEM_LIBRARY boost_system) +endif (NOT Boost_SYSTEM_LIBRARY) + if (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY) set(Boost_UNIT_TEST_FRAMEWORK_LIBRARY boost_unit_test_framework) endif (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY) @@ -487,6 +491,7 @@ endif (BUILD_SASL) CHECK_LIBRARY_EXISTS (xerces-c _init "" HAVE_XERCES) CHECK_INCLUDE_FILE_CXX (xercesc/framework/MemBufInputSource.hpp HAVE_XERCES_H) CHECK_INCLUDE_FILE_CXX (xqilla/xqilla-simple.hpp HAVE_XQILLA_H) +CHECK_INCLUDE_FILE_CXX (xqilla/ast/XQEffectiveBooleanValue.hpp HAVE_XQ_EBV) set (xml_default ${xml_force}) if (CMAKE_SYSTEM_NAME STREQUAL Windows) @@ -510,6 +515,10 @@ if (BUILD_XML) message(FATAL_ERROR "XML Exchange support requested but XQilla headers not found") endif (NOT HAVE_XQILLA_H) + if (HAVE_XQ_EBV) + add_definitions(-DXQ_EFFECTIVE_BOOLEAN_VALUE_HPP) + endif (HAVE_XQ_EBV) + add_library (xml MODULE qpid/xml/XmlExchange.cpp qpid/xml/XmlExchange.h @@ -956,6 +965,11 @@ set (qpidbroker_SOURCES qpid/broker/Broker.cpp qpid/broker/Exchange.cpp qpid/broker/ExpiryPolicy.cpp + qpid/broker/Fairshare.cpp + qpid/broker/LegacyLVQ.cpp + qpid/broker/MessageDeque.cpp + qpid/broker/MessageMap.cpp + qpid/broker/PriorityQueue.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueListeners.cpp @@ -1006,6 +1020,7 @@ set (qpidbroker_SOURCES qpid/broker/SessionHandler.h qpid/broker/SessionHandler.cpp qpid/broker/System.cpp + qpid/broker/ThresholdAlerts.cpp qpid/broker/TopicExchange.cpp qpid/broker/TxAccept.cpp qpid/broker/TxBuffer.cpp @@ -1207,6 +1222,8 @@ install (TARGETS replication_exchange # file whereas older builds only have config.h on autoconf-generated builds. add_definitions(-DHAVE_CONFIG_H) +add_definitions(-DBOOST_FILESYSTEM_VERSION=2) + # Now create the config file from all the info learned above. configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8f00cefb33..6fafff7d54 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -24,6 +24,7 @@ SUBDIRS = . tests # using Visual Studio solutions/projects. windows_dist = \ qpid/client/windows/SaslFactory.cpp \ + qpid/client/windows/SslConnector.cpp \ qpid/log/windows/SinkOptions.cpp \ qpid/log/windows/SinkOptions.h \ ../include/qpid/sys/windows/check.h \ @@ -42,6 +43,8 @@ windows_dist = \ qpid/sys/windows/Shlib.cpp \ qpid/sys/windows/SocketAddress.cpp \ qpid/sys/windows/Socket.cpp \ + qpid/sys/windows/SslAsynchIO.cpp \ + qpid/sys/windows/SslAsynchIO.h \ qpid/sys/windows/StrError.cpp \ qpid/sys/windows/SystemInfo.cpp \ qpid/sys/windows/Thread.cpp \ @@ -51,7 +54,9 @@ windows_dist = \ qpid/sys/windows/uuid.h \ windows/QpiddBroker.cpp \ qpid/broker/windows/BrokerDefaults.cpp \ - qpid/broker/windows/SaslAuthenticator.cpp + qpid/broker/windows/SaslAuthenticator.cpp \ + qpid/broker/windows/SslProtocolFactory.cpp \ + qpid/messaging/HandleInstantiator.cpp EXTRA_DIST= $(platform_dist) $(rgen_srcs) $(windows_dist) @@ -122,6 +127,8 @@ qpidtest_SCRIPTS = tmoduledir = $(libdir)/qpid/tests tmodule_LTLIBRARIES= +AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2 + ## Automake macros to build libraries and executables. qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\" @@ -543,6 +550,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ExchangeRegistry.h \ qpid/broker/ExpiryPolicy.cpp \ qpid/broker/ExpiryPolicy.h \ + qpid/broker/Fairshare.h \ + qpid/broker/Fairshare.cpp \ qpid/broker/FanOutExchange.cpp \ qpid/broker/FanOutExchange.h \ qpid/broker/FedOps.h \ @@ -550,6 +559,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HeadersExchange.cpp \ qpid/broker/HeadersExchange.h \ qpid/broker/AsyncCompletion.h \ + qpid/broker/LegacyLVQ.h \ + qpid/broker/LegacyLVQ.cpp \ qpid/broker/Link.cpp \ qpid/broker/Link.h \ qpid/broker/LinkRegistry.cpp \ @@ -560,9 +571,16 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageBuilder.h \ + qpid/broker/MessageDeque.h \ + qpid/broker/MessageDeque.cpp \ + qpid/broker/MessageMap.h \ + qpid/broker/MessageMap.cpp \ + qpid/broker/Messages.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/MessageStoreModule.h \ + qpid/broker/PriorityQueue.h \ + qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.cpp \ @@ -584,6 +602,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueEvents.h \ qpid/broker/QueueListeners.cpp \ qpid/broker/QueueListeners.h \ + qpid/broker/QueueObserver.h \ qpid/broker/QueuePolicy.cpp \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.cpp \ @@ -632,6 +651,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SignalHandler.h \ qpid/broker/System.cpp \ qpid/broker/System.h \ + qpid/broker/ThresholdAlerts.cpp \ + qpid/broker/ThresholdAlerts.h \ qpid/broker/TopicExchange.cpp \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index 176cadf0c1..915f2a1c88 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -339,7 +339,7 @@ void AgentImpl::handleMethodResponse(const Variant::Map& response, const Message uint32_t correlator; boost::shared_ptr<SyncContext> context; - QPID_LOG(trace, "RCVD MethodResponse map=" << response); + QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response); aIter = response.find("_arguments"); if (aIter != response.end()) @@ -556,13 +556,14 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator) msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); msg.setSubject(directSubject); - if (!session.authUser.empty()) - msg.setUserId(session.authUser); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); encode(QueryImplAccess::get(query).asMap(), msg); - if (sender.isValid()) + if (sender.isValid()) { sender.send(msg); - - QPID_LOG(trace, "SENT QueryRequest to=" << name); + QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator); + } } @@ -583,13 +584,14 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); msg.setSubject(directSubject); - if (!session.authUser.empty()) - msg.setUserId(session.authUser); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); encode(map, msg); - if (sender.isValid()) + if (sender.isValid()) { sender.send(msg); - - QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name); + QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator); + } } void AgentImpl::sendSchemaRequest(const SchemaId& id) @@ -626,12 +628,13 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) msg.setReplyTo(session.replyAddress); msg.setContent(content); msg.setSubject(directSubject); - if (!session.authUser.empty()) - msg.setUserId(session.authUser); - if (sender.isValid()) + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); + if (sender.isValid()) { sender.send(msg); - - QPID_LOG(trace, "SENT V1SchemaRequest to=" << name); + QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject); + } } diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp index 30176a8c01..4c5a72a467 100644 --- a/qpid/cpp/src/qmf/AgentSession.cpp +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -571,7 +571,7 @@ void AgentSessionImpl::raiseEvent(const Data& data, int severity) encode(list, msg); topicSender.send(msg); - QPID_LOG(trace, "SENT EventIndication to=" << subject); + QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject); } @@ -625,7 +625,7 @@ void AgentSessionImpl::setAgentName() void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg) { - QPID_LOG(trace, "RCVD AgentLocateRequest"); + QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo()); if (!predicate.empty()) { Query agentQuery(QUERY_OBJECT); @@ -659,7 +659,7 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg) { - QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo()); + QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); // // Construct an AgentEvent to be sent to the application. @@ -719,7 +719,7 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg) { - QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo()); + QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); // // Construct an AgentEvent to be sent to the application or directly handled by the agent. diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index bb4458a0b9..e12c1152f6 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -65,7 +65,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5), + connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { @@ -394,6 +394,7 @@ void ConsoleSessionImpl::sendAgentLocate() { Message msg; Variant::Map& headers(msg.getProperties()); + static const string subject("console.request.agent_locate"); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; @@ -401,12 +402,12 @@ void ConsoleSessionImpl::sendAgentLocate() msg.setReplyTo(replyAddress); msg.setCorrelationId("agent-locate"); - msg.setSubject("console.request.agent_locate"); + msg.setSubject(subject); encode(agentQuery.getPredicate(), msg); topicSender.send(msg); - QPID_LOG(trace, "SENT AgentLocate to topic"); + QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject); } diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h index e495c1c1e8..675c8bcfb5 100644 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h @@ -72,7 +72,6 @@ namespace qmf { qpid::messaging::Sender directSender; qpid::messaging::Sender topicSender; std::string domain; - std::string authUser; uint32_t maxAgentAgeMinutes; bool listenOnDirect; bool strictSecurity; diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.h b/qpid/cpp/src/qpid/RefCountedBuffer.h index c332325378..75a23862be 100644 --- a/qpid/cpp/src/qpid/RefCountedBuffer.h +++ b/qpid/cpp/src/qpid/RefCountedBuffer.h @@ -27,7 +27,7 @@ #include <boost/intrusive_ptr.hpp> namespace qpid { -// FIXME aconway 2008-09-06: easy to add alignment + /** * Reference-counted byte buffer. * No alignment guarantees. @@ -51,7 +51,7 @@ public: pointer(const pointer&); ~pointer(); pointer& operator=(const pointer&); - + char* get() { return cp(); } operator char*() { return cp(); } char& operator*() { return *cp(); } @@ -62,7 +62,7 @@ public: const char& operator*() const { return *cp(); } const char& operator[](size_t i) const { return cp()[i]; } }; - + /** Create a reference counted buffer of size n */ static pointer create(size_t n); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 2ef6933612..ebccdbe38f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -103,7 +103,8 @@ Broker::Options::Options(const std::string& name) : requireEncrypted(false), maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. - qmf2Support(false), + qmf2Support(true), + qmf1Support(true), queueFlowStopRatio(80), queueFlowResumeRatio(70) { @@ -125,7 +126,8 @@ Broker::Options::Options(const std::string& name) : ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") - ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") + ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -153,7 +155,7 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support, + managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) : 0), store(new NullMessageStore), @@ -225,7 +227,6 @@ Broker::Broker(const Broker::Options& conf) : } QueuePolicy::setDefaultMaxSize(conf.queueLimit); - queues.setQueueEvents(&queueEvents); // Early-Initialize plugins Plugin::earlyInitAll(*this); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 55823bc45a..9ed9ca3995 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -115,6 +115,7 @@ public: uint32_t maxSessionRate; bool asyncQueueEvents; bool qmf2Support; + bool qmf1Support; uint queueFlowStopRatio; // producer flow control: on uint queueFlowResumeRatio; // producer flow control: off diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp new file mode 100644 index 0000000000..e6bbf86691 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Fairshare.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { + +Fairshare::Fairshare(size_t levels, uint limit) : + PriorityQueue(levels), + limits(levels, limit), priority(levels-1), count(0) {} + + +void Fairshare::setLimit(size_t level, uint limit) +{ + limits[level] = limit; +} + +bool Fairshare::limitReached() +{ + uint l = limits[priority]; + return l && ++count > l; +} + +uint Fairshare::currentLevel() +{ + if (limitReached()) { + return nextLevel(); + } else { + return priority; + } +} + +uint Fairshare::nextLevel() +{ + count = 1; + if (priority) --priority; + else priority = levels-1; + return priority; +} + +bool Fairshare::isNull() +{ + for (int i = 0; i < levels; i++) if (limits[i]) return false; + return true; +} + +bool Fairshare::getState(uint& p, uint& c) const +{ + p = priority; + c = count; + return true; +} + +bool Fairshare::setState(uint p, uint c) +{ + priority = p; + count = c; + return true; +} + +bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages) +{ + const uint start = p = currentLevel(); + do { + if (!messages[p].empty()) return true; + } while ((p = nextLevel()) != start); + return false; +} + + + +bool Fairshare::getState(const Messages& m, uint& priority, uint& count) +{ + const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m); + return fairshare && fairshare->getState(priority, count); +} + +bool Fairshare::setState(Messages& m, uint priority, uint count) +{ + Fairshare* fairshare = dynamic_cast<Fairshare*>(&m); + return fairshare && fairshare->setState(priority, count); +} + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + +int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue) +{ + return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue)); +} + +std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings) +{ + std::auto_ptr<Messages> result; + size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100); + if (levels) { + uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare"); + std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit)); + for (uint i = 0; i < levels; i++) { + std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str(); + if(settings.isSet(key)) { + fairshare->setLimit(i, getIntegerSetting(settings, key)); + } + } + + if (fairshare->isNull()) { + result = std::auto_ptr<Messages>(new PriorityQueue(levels)); + } else { + result = fairshare; + } + } + return result; +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h new file mode 100644 index 0000000000..6c4b87f857 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Fairshare.h @@ -0,0 +1,61 @@ +#ifndef QPID_BROKER_FAIRSHARE_H +#define QPID_BROKER_FAIRSHARE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace broker { + +/** + * Modifies a basic prioirty queue by limiting the number of messages + * from each priority level that are dispatched before allowing + * dispatch from the next level. + */ +class Fairshare : public PriorityQueue +{ + public: + Fairshare(size_t levels, uint limit); + bool getState(uint& priority, uint& count) const; + bool setState(uint priority, uint count); + void setLimit(size_t level, uint limit); + static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings); + static bool getState(const Messages&, uint& priority, uint& count); + static bool setState(Messages&, uint priority, uint count); + private: + std::vector<uint> limits; + + uint priority; + uint count; + + uint currentLevel(); + uint nextLevel(); + bool isNull(); + bool limitReached(); + bool findFrontLevel(uint& p, PriorityLevels&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_FAIRSHARE_H*/ diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp new file mode 100644 index 0000000000..a811a86492 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { + +LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {} + +void LegacyLVQ::setNoBrowse(bool b) +{ + noBrowse = b; +} + +bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end() && i->second.payload == message.payload) { + message = i->second; + erase(i); + return true; + } else { + return false; + } +} + +bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (MessageMap::next(position, message)) { + if (!noBrowse) index.erase(getKey(message)); + return true; + } else { + return false; + } +} + +bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed) +{ + //Hack to disable LVQ behaviour on cluster update: + if (broker && broker->isClusterUpdatee()) { + messages[added.position] = added; + return false; + } else { + return MessageMap::push(added, removed); + } +} + +const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update) +{ + //add the new message into the original position of the replaced message + Ordering::iterator i = messages.find(original.position); + i->second = update; + i->second.position = original.position; + return i->second; +} + +void LegacyLVQ::removeIf(Predicate p) +{ + //Note: This method is currently called periodically on the timer + //thread to expire messages. In a clustered broker this means that + //the purging does not occur on the cluster event dispatch thread + //and consequently that is not totally ordered w.r.t other events + //(including publication of messages). The cluster does ensure + //that the actual expiration of messages (as distinct from the + //removing of those expired messages from the queue) *is* + //consistently ordered w.r.t. cluster events. This means that + //delivery of messages is in general consistent across the cluster + //inspite of any non-determinism in the triggering of a + //purge. However at present purging a last value queue (of the + //legacy sort) could potentially cause inconsistencies in the + //cluster (as the order w.r.t publications can affect the order in + //which messages appear in the queue). Consequently periodic + //purging of an LVQ is not enabled if the broker is clustered + //(expired messages will be removed on delivery and consolidated + //by key as part of normal LVQ operation). + + //TODO: Is there a neater way to check whether broker is + //clustered? Here we assume that if the clustered timer is the + //same as the regular timer, we are not clustered: + if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer())) + MessageMap::removeIf(p); +} + +std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current, + const std::string& key, bool noBrowse, Broker* broker) +{ + LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get()); + if (lvq) { + lvq->setNoBrowse(noBrowse); + return current; + } else { + return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker)); + } +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h new file mode 100644 index 0000000000..dd0fd7aaec --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -0,0 +1,59 @@ +#ifndef QPID_BROKER_LEGACYLVQ_H +#define QPID_BROKER_LEGACYLVQ_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageMap.h" +#include <memory> + +namespace qpid { +namespace broker { +class Broker; + +/** + * This class encapsulates the behaviour of the old style LVQ where a + * message replacing another messages for the given key will use the + * position in the queue of the previous message. This however causes + * problems for browsing. Either browsers stop the coalescing of + * messages by key (default) or they may mis updates (if the no-browse + * option is specified). + */ +class LegacyLVQ : public MessageMap +{ + public: + LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + void removeIf(Predicate); + void setNoBrowse(bool); + static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current, + const std::string& key, bool noBrowse, + Broker* broker); + protected: + bool noBrowse; + Broker* broker; + + const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_LEGACYLVQ_H*/ diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index a16180f3ae..122c5b9c1a 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -400,22 +400,6 @@ bool Message::hasExpired() return expiryPolicy && expiryPolicy->hasExpired(*this); } -boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const -{ - sys::Mutex::ScopedLock l(lock); - Replacement::iterator i = replacement.find(qfor); - if (i != replacement.end()){ - return i->second; - } - return empty; -} - -void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor) -{ - sys::Mutex::ScopedLock l(lock); - replacement[qfor] = msg; -} - namespace { struct ScopedSet { sys::Monitor& lock; diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index e8a8a19d53..2d0de27823 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -153,8 +153,6 @@ public: void forcePersistent(); bool isForcedPersistent(); - boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; - void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); @@ -163,8 +161,6 @@ public: uint8_t getPriority() const; private: - typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; - MessageAdapter& getAdapter() const; void allDequeuesComplete(); @@ -183,7 +179,6 @@ public: static TransferAdapter TRANSFER; - mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; sys::Monitor callbackLock; diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp new file mode 100644 index 0000000000..24b8f6f895 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { + +size_t MessageDeque::size() +{ + return messages.size(); +} + +bool MessageDeque::empty() +{ + return messages.empty(); +} + +void MessageDeque::reinsert(const QueuedMessage& message) +{ + messages.insert(lower_bound(messages.begin(), messages.end(), message), message); +} + +MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position) +{ + if (!messages.empty()) { + QueuedMessage comp; + comp.position = position; + unsigned long diff = position.getValue() - messages.front().position.getValue(); + long maxEnd = diff < messages.size()? diff : messages.size(); + return lower_bound(messages.begin(),messages.begin()+maxEnd,comp); + } else { + return messages.end(); + } +} + +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +{ + Deque::iterator i = seek(position); + if (i != messages.end() && i->position == position) { + message = *i; + if (remove) messages.erase(i); + return true; + } else { + return false; + } +} + +bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, true); +} + +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, false); +} + +bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (messages.empty()) { + return false; + } else if (position < front().position) { + message = front(); + return true; + } else { + Deque::iterator i = seek(position+1); + if (i != messages.end()) { + message = *i; + return true; + } else { + return false; + } + } +} + +QueuedMessage& MessageDeque::front() +{ + return messages.front(); +} + +void MessageDeque::pop() +{ + if (!messages.empty()) { + messages.pop_front(); + } +} + +bool MessageDeque::pop(QueuedMessage& out) +{ + if (messages.empty()) { + return false; + } else { + out = front(); + messages.pop_front(); + return true; + } +} + +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +{ + messages.push_back(added); + return false;//adding a message never causes one to be removed for deque +} + +void MessageDeque::foreach(Functor f) +{ + std::for_each(messages.begin(), messages.end(), f); +} + +void MessageDeque::removeIf(Predicate p) +{ + for (Deque::iterator i = messages.begin(); i != messages.end();) { + if (p(*i)) { + i = messages.erase(i); + } else { + ++i; + } + } +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h new file mode 100644 index 0000000000..0e1aef2986 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -0,0 +1,62 @@ +#ifndef QPID_BROKER_MESSAGEDEQUE_H +#define QPID_BROKER_MESSAGEDEQUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/broker/QueuedMessage.h" +#include <deque> + +namespace qpid { +namespace broker { + +/** + * Provides the standard FIFO queue behaviour. + */ +class MessageDeque : public Messages +{ + public: + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + void removeIf(Predicate); + + private: + typedef std::deque<QueuedMessage> Deque; + Deque messages; + + Deque::iterator seek(const framing::SequenceNumber&); + bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEDEQUE_H*/ diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp new file mode 100644 index 0000000000..39e23df533 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageMap.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { +namespace { +const std::string EMPTY; +} + +std::string MessageMap::getKey(const QueuedMessage& message) +{ + const framing::FieldTable* ft = message.payload->getApplicationHeaders(); + if (ft) return ft->getAsString(key); + else return EMPTY; +} + +size_t MessageMap::size() +{ + return messages.size(); +} + +bool MessageMap::empty() +{ + return messages.empty(); +} + +void MessageMap::reinsert(const QueuedMessage& message) +{ + std::string key = getKey(message); + Index::iterator i = index.find(key); + if (i == index.end()) { + index[key] = message; + messages[message.position] = message; + } //else message has already been replaced +} + +bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end()) { + message = i->second; + erase(i); + return true; + } else { + return false; + } +} + +bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end()) { + message = i->second; + return true; + } else { + return false; + } +} + +bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (!messages.empty() && position < front().position) { + message = front(); + return true; + } else { + Ordering::iterator i = messages.lower_bound(position+1); + if (i != messages.end()) { + message = i->second; + return true; + } else { + return false; + } + } +} + +QueuedMessage& MessageMap::front() +{ + return messages.begin()->second; +} + +void MessageMap::pop() +{ + QueuedMessage dummy; + pop(dummy); +} + +bool MessageMap::pop(QueuedMessage& out) +{ + Ordering::iterator i = messages.begin(); + if (i != messages.end()) { + out = i->second; + erase(i); + return true; + } else { + return false; + } +} + +const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) +{ + messages.erase(original.position); + messages[update.position] = update; + return update; +} + +bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) +{ + std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added)); + if (result.second) { + //there was no previous message for this key; nothing needs to + //be removed, just add the message into its correct position + messages[added.position] = added; + return false; + } else { + //there is already a message with that key which needs to be replaced + removed = result.first->second; + result.first->second = replace(result.first->second, added); + return true; + } +} + +void MessageMap::foreach(Functor f) +{ + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + f(i->second); + } +} + +void MessageMap::removeIf(Predicate p) +{ + for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { + if (p(i->second)) { + erase(i); + } + } +} + +void MessageMap::erase(Ordering::iterator i) +{ + index.erase(getKey(i->second)); + messages.erase(i); +} + +MessageMap::MessageMap(const std::string& k) : key(k) {} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h new file mode 100644 index 0000000000..1128a1d54a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -0,0 +1,72 @@ +#ifndef QPID_BROKER_MESSAGEMAP_H +#define QPID_BROKER_MESSAGEMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/framing/SequenceNumber.h" +#include <map> +#include <string> + +namespace qpid { +namespace broker { + +/** + * Provides a last value queue behaviour, whereby a messages replace + * any previous message with the same value for a defined property + * (i.e. the key). + */ +class MessageMap : public Messages +{ + public: + MessageMap(const std::string& key); + virtual ~MessageMap() {} + + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + virtual bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + virtual bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + virtual bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + virtual void removeIf(Predicate); + + protected: + typedef std::map<std::string, QueuedMessage> Index; + typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering; + const std::string key; + Index index; + Ordering messages; + + std::string getKey(const QueuedMessage&); + virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&); + void erase(Ordering::iterator); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEMAP_H*/ diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h new file mode 100644 index 0000000000..0d75417640 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Messages.h @@ -0,0 +1,117 @@ +#ifndef QPID_BROKER_MESSAGES_H +#define QPID_BROKER_MESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/function.hpp> + +namespace qpid { +namespace framing { +class SequenceNumber; +} +namespace broker { +struct QueuedMessage; + +/** + * This interface abstracts out the access to the messages held for + * delivery by a Queue instance. + */ +class Messages +{ + public: + typedef boost::function1<void, QueuedMessage&> Functor; + typedef boost::function1<bool, QueuedMessage&> Predicate; + + virtual ~Messages() {} + /** + * @return the number of messages available for delivery. + */ + virtual size_t size() = 0; + /** + * @return true if there are no messages for delivery, false otherwise + */ + virtual bool empty() = 0; + + /** + * Re-inserts a message back into its original position - used + * when requeing released messages. + */ + virtual void reinsert(const QueuedMessage&) = 0; + /** + * Remove the message at the specified position, returning true if + * found, false otherwise. The removed message is passed back via + * the second parameter. + */ + virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0; + /** + * Find the message at the specified position, returning true if + * found, false otherwise. The matched message is passed back via + * the second parameter. + */ + virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0; + /** + * Return the next message to be given to a browsing subscrption + * that has reached the specified poisition. The next messages is + * passed back via the second parameter. + * + * @return true if there is another message, false otherwise. + */ + virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; + + /** + * Note: Caller is responsible for ensuring that there is a front + * (e.g. empty() returns false) + * + * @return the next message to be delivered + */ + virtual QueuedMessage& front() = 0; + /** + * Removes the front message + */ + virtual void pop() = 0; + /** + * @return true if there is a mesage to be delivered - in which + * case that message will be returned via the parameter and + * removed - otherwise false. + */ + virtual bool pop(QueuedMessage&) = 0; + /** + * Pushes a message to the back of the 'queue'. For some types of + * queue this may cause another message to be removed; if that is + * the case the method will return true and the removed message + * will be passed out via the second parameter. + */ + virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; + + /** + * Apply the functor to each message held + */ + virtual void foreach(Functor) = 0; + /** + * Remove every message held that for which the specified + * predicate returns true + */ + virtual void removeIf(Predicate) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGES_H*/ diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp new file mode 100644 index 0000000000..e07e73d323 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -0,0 +1,212 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/reply_exceptions.h" +#include <cmath> + +namespace qpid { +namespace broker { + +PriorityQueue::PriorityQueue(int l) : + levels(l), + messages(levels, Deque()), + frontLevel(0), haveFront(false), cached(false) {} + +size_t PriorityQueue::size() +{ + size_t total(0); + for (int i = 0; i < levels; ++i) { + total += messages[i].size(); + } + return total; +} + +bool PriorityQueue::empty() +{ + for (int i = 0; i < levels; ++i) { + if (!messages[i].empty()) return false; + } + return true; +} + +void PriorityQueue::reinsert(const QueuedMessage& message) +{ + uint p = getPriorityLevel(message); + messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); + clearCache(); +} + +bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +{ + QueuedMessage comp; + comp.position = position; + for (int i = 0; i < levels; ++i) { + if (!messages[i].empty()) { + unsigned long diff = position.getValue() - messages[i].front().position.getValue(); + long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); + Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); + if (l != messages[i].end() && l->position == position) { + message = *l; + if (remove) { + messages[i].erase(l); + clearCache(); + } + return true; + } + } + } + return false; +} + +bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, true); +} + +bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, false); +} + +bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + QueuedMessage match; + match.position = position+1; + Deque::iterator lowest; + bool found = false; + for (int i = 0; i < levels; ++i) { + Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); + if (m != messages[i].end()) { + if (m->position == match.position) { + message = *m; + return true; + } else if (!found || m->position < lowest->position) { + lowest = m; + found = true; + } + } + } + if (found) { + message = *lowest; + } + return found; +} + +QueuedMessage& PriorityQueue::front() +{ + if (checkFront()) { + return messages[frontLevel].front(); + } else { + throw qpid::framing::InternalErrorException(QPID_MSG("No message available")); + } +} + +bool PriorityQueue::pop(QueuedMessage& message) +{ + if (checkFront()) { + message = messages[frontLevel].front(); + messages[frontLevel].pop_front(); + clearCache(); + return true; + } else { + return false; + } +} + +void PriorityQueue::pop() +{ + QueuedMessage dummy; + pop(dummy); +} + +bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +{ + messages[getPriorityLevel(added)].push_back(added); + clearCache(); + return false;//adding a message never causes one to be removed for deque +} + +void PriorityQueue::foreach(Functor f) +{ + for (int i = 0; i < levels; ++i) { + std::for_each(messages[i].begin(), messages[i].end(), f); + } +} + +void PriorityQueue::removeIf(Predicate p) +{ + for (int priority = 0; priority < levels; ++priority) { + for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { + if (p(*i)) { + i = messages[priority].erase(i); + clearCache(); + } else { + ++i; + } + } + } +} + +uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const +{ + uint priority = m.payload->getPriority(); + //Use AMQP 0-10 approach to mapping priorities to a fixed level + //(see rule priority-level-implementation) + const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0))); + if (priority <= firstLevel) return 0; + return std::min(priority - firstLevel, (uint)levels-1); +} + +void PriorityQueue::clearCache() +{ + cached = false; +} + +bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) +{ + for (int p = levels-1; p >= 0; --p) { + if (!m[p].empty()) { + l = p; + return true; + } + } + return false; +} + +bool PriorityQueue::checkFront() +{ + if (!cached) { + haveFront = findFrontLevel(frontLevel, messages); + cached = true; + } + return haveFront; +} + +uint PriorityQueue::getPriority(const QueuedMessage& message) +{ + const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages())); + if (queue) return queue->getPriorityLevel(message); + else return 0; +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h new file mode 100644 index 0000000000..4bf9d26a9d --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -0,0 +1,78 @@ +#ifndef QPID_BROKER_PRIORITYQUEUE_H +#define QPID_BROKER_PRIORITYQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/sys/IntegerTypes.h" +#include <deque> +#include <vector> + +namespace qpid { +namespace broker { + +/** + * Basic priority queue with a configurable number of recognised + * priority levels. This is implemented as a separate deque per + * priority level. Browsing is FIFO not priority order. + */ +class PriorityQueue : public Messages +{ + public: + PriorityQueue(int levels); + virtual ~PriorityQueue() {} + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + void removeIf(Predicate); + static uint getPriority(const QueuedMessage&); + protected: + typedef std::deque<QueuedMessage> Deque; + typedef std::vector<Deque> PriorityLevels; + virtual bool findFrontLevel(uint& p, PriorityLevels&); + + const int levels; + private: + PriorityLevels messages; + uint frontLevel; + bool haveFront; + bool cached; + + bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); + uint getPriorityLevel(const QueuedMessage&) const; + void clearCache(); + bool checkFront(); +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PRIORITYQUEUE_H*/ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3de93ed74e..cfb32749a0 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -23,11 +23,16 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/MessageMap.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max_count"); const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueueKey("qpid.last_value_queue_key"); const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout"); //following feature is not ready for general use as it doesn't handle //the case where a message is enqueued on more than one queue well enough: const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); @@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _autodelete, consumerCount(0), exclusive(0), noLocal(false), - lastValueQueue(false), - lastValueQueueNoBrowse(false), persistLastNode(false), inLastNodeFailure(false), + messages(new MessageDeque()), persistenceId(0), policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0), insertSeqNo(0), broker(b), deleted(false), - barrier(*this) + barrier(*this), + autoDeleteTimeout(0) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ } else { enqueue(0, msg); push(msg); - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued - mgntEnqStats(msg); if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage& msg){ Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued - messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg); + messages->reinsert(msg); listeners.populate(copy); // for persistLastNode - don't force a message twice to disk, but force it if no force before @@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -void Queue::clearLVQIndex(const QueuedMessage& msg){ - assertClusterSafe(); - const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0; - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } -} - bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - - Messages::iterator i = findAt(position); - if (i != messages.end() ) { - message = *i; - if (lastValueQueue) { - clearLVQIndex(*i); - } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); - messages.erase(i); + if (messages->remove(position, message)) { + QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; - } - QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); - return false; + } else { + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; + } } bool Queue::acquire(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - assertClusterSafe(); - - QPID_LOG(debug, "attempting to acquire " << msg.position); - Messages::iterator i = findAt(msg.position); - if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set - (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { - - clearLVQIndex(msg); - QPID_LOG(debug, - "Match found, acquire succeeded: " << - i->position << " == " << msg.position); - messages.erase(i); - return true; - } - - QPID_LOG(debug, "Acquire failed for " << msg.position); - return false; + QueuedMessage copy = msg; + return acquireMessageAt(msg.position, copy); } void Queue::notifyListener() @@ -282,7 +251,7 @@ void Queue::notifyListener() QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; } else { - QueuedMessage msg = getFront(); + QueuedMessage msg = messages->front(); if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); popAndDequeue(); @@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - popMsg(msg); + pop(); return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit @@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; - if (!lastValueQueueNoBrowse) clearLVQIndex(msg); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) m.payload = replacement; - } return true; } else { //browser hasn't got enough credit for the message @@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); listeners.removeListener(c); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_ptr c) // Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > c->position) { - if (c->position < getFront().position) { - msg = getFront(); - return true; - } else { - Messages::iterator pos = findAt(c->position); - if (pos != messages.end() && pos+1 != messages.end()) { - msg = *(pos+1); - return true; - } - } + if (messages->next(c->position, msg)) { + return true; + } else { + listeners.addListener(c); + return false; } - listeners.addListener(c); - return false; } -Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { - - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i!= messages.end() && i->position == pos) - return i; - } - return messages.end(); // no match found. -} - - QueuedMessage Queue::find(SequenceNumber pos) const { Mutex::ScopedLock locker(messageLock); - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i != messages.end()) - return *i; - } - return QueuedMessage(); + QueuedMessage msg; + messages->find(pos, msg); + return msg; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ @@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ consumerCount++; if (mgmtObject != 0) mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } } void Queue::cancel(Consumer::shared_ptr c){ @@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr c){ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); + messages->pop(msg); + return msg; +} - if(!messages.empty()){ - msg = getFront(); - popMsg(msg); +bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +{ + if (message.payload->hasExpired()) { + expired.push_back(message); + return true; + } else { + return false; } - return msg; } void Queue::purgeExpired() @@ -492,37 +434,11 @@ void Queue::purgeExpired() //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. - //Note: This method is currently called periodically on the timer - //thread. In a clustered broker this means that the purging does - //not occur on the cluster event dispatch thread and consequently - //that is not totally ordered w.r.t other events (including - //publication of messages). However the cluster does ensure that - //the actual expiration of messages (as distinct from the removing - //of those expired messages from the queue) *is* consistently - //ordered w.r.t. cluster events. This means that delivery of - //messages is in general consistent across the cluster inspite of - //any non-determinism in the triggering of a purge. However at - //present purging a last value queue could potentially cause - //inconsistencies in the cluster (as the order w.r.t publications - //can affect the order in which messages appear in the - //queue). Consequently periodic purging of an LVQ is not enabled - //(expired messages will be removed on delivery and consolidated - //by key as part of normal LVQ operation). - - if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { - Messages expired; + if (dequeueTracker.sampleRatePerSecond() < 1) { + std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); - for (Messages::iterator i = messages.begin(); i != messages.end();) { - //Re-introduce management of LVQ-specific state here - //if purging is renabled for that case (see note above) - if (i->payload->hasExpired()) { - expired.push_back(*i); - i = messages.erase(i); - } else { - ++i; - } - } + messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages.empty()) { + while((!purge_request || purge_count--) && !messages->empty()) { if (dest.get()) { // // If there is a destination exchange, stage the messages onto a reroute queue // so they don't wind up getting purged more than once. // - DeliverableMessage msg(getFront().payload); + DeliverableMessage msg(messages->front().payload); rerouteQueue.push_back(msg); } popAndDequeue(); @@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning - while((!qty || move_count--) && !messages.empty()) { - QueuedMessage qmsg = getFront(); + while((!qty || move_count--) && !messages->empty()) { + QueuedMessage qmsg = messages->front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - popMsg(qmsg); + pop(); dequeue(0, qmsg); count++; } return count; } -void Queue::popMsg(QueuedMessage& qmsg) +void Queue::pop() { assertClusterSafe(); - const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } - messages.pop_front(); + messages->pop(); ++dequeueTracker; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; + QueuedMessage removed; + bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - LVQ::iterator i; - const framing::FieldTable* ft = msg->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - - i = lvq.find(key); - if (i == lvq.end() || (broker && broker->isClusterUpdatee())) { - messages.push_back(qm); - listeners.populate(copy); - lvq[key] = msg; - }else { - boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); - if (!old) old = i->second; - i->second->setReplacementMessage(msg,this); - if (isRecovery) { - //can't issue new requests for the store until - //recovery is complete - pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); - } else { - Mutex::ScopedUnlock u(messageLock); - dequeue(0, QueuedMessage(qm.queue, old, qm.position)); - } - } - }else { - messages.push_back(qm); - listeners.populate(copy); - } - if (eventMode) { - if (eventMgr) eventMgr->enqueued(qm); - else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); - } - if (policy.get()) { - policy->enqueued(qm); - } - if (flowLimit.get()) - flowLimit->enqueued(qm); + dequeueRequired = messages->push(qm, removed); + listeners.populate(copy); + enqueued(qm); } copy.notify(); -} - -QueuedMessage Queue::getFront() -{ - QueuedMessage msg = messages.front(); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) msg.payload = replacement; + if (dequeueRequired) { + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(removed); + } else { + dequeue(0, removed); + } } - return msg; } -QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) +void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) { - const framing::FieldTable* ft = replacement->getApplicationHeaders(); - if (ft) { - string key = ft->getAsString(qpidVQMatchProperty); - if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } - } - msg.payload = replacement; - } - return msg; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const { Mutex::ScopedLock locker(messageLock); uint32_t count = 0; - for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - //NOTE: don't need to use checkLvqReplace() here as it - //is only relevant for LVQ which does not support persistence - //so the enqueueComplete check has no effect - if ( i->payload->isIngressComplete() ) count ++; - } - + messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - return messages.size(); + return messages->size(); } uint32_t Queue::getConsumerCount() const @@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(consumerLock); - return autodelete && !consumerCount; + return autodelete && !consumerCount && !owner; } void Queue::clearLastNodeFailure() @@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure() inLastNodeFailure = false; } +void Queue::forcePersistent(QueuedMessage& message) +{ + if(!message.payload->isStoredOnQueue(shared_from_this())) { + message.payload->forcePersistent(); + if (message.payload->isForcedPersistent() ){ + enqueue(0, message.payload); + } + } +} + void Queue::setLastNodeFailure() { if (persistLastNode){ Mutex::ScopedLock locker(messageLock); try { - for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { - if (lastValueQueue) checkLvqReplace(*i); - // don't force a message twice to disk. - if(!i->payload->isStoredOnQueue(shared_from_this())) { - i->payload->forcePersistent(); - if (i->payload->isForcedPersistent() ){ - enqueue(0, i->payload); - } - } - } + messages->foreach(boost::bind(&Queue::forcePersistent, this, _1)); } catch (const std::exception& e) { // Could not go into last node standing (for example journal not large enough) QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what()); @@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg if (!u.acquired) return false; if (policy.get() && !suppressPolicyCheck) { - Messages dequeues; + std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); policy->tryEnqueue(msg); @@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) */ void Queue::popAndDequeue() { - QueuedMessage msg = getFront(); - popMsg(msg); + QueuedMessage msg = messages->front(); + pop(); dequeue(0, msg); } @@ -845,11 +708,16 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); + /** todo KAG make flowLimit an observer */ if (flowLimit.get()) flowLimit->dequeued(msg); mgntDeqStats(msg.payload); - if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { - eventMgr->dequeued(msg); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->dequeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); + } } } @@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _settings) configure(_settings); } + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + void Queue::configure(const FieldTable& _settings, bool recovering) { eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (eventMode && broker) { + broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); + } if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); - } else if (eventMgr && !eventMgr->isSync() ) { + } else if (broker && !(broker->getQueueEvents().isSync()) ) { QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); } FieldTable copy(_settings); @@ -881,17 +774,30 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } else { setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); } + if (broker && broker->getManagementAgent()) { + ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings); + } + //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); - lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); - - lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); - if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); - lastValueQueue = lastValueQueueNoBrowse; + std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); + if (lvqKey.size()) { + QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); + messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + } else if (_settings.get(qpidLastValueQueueNoBrowse)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + } else if (_settings.get(qpidLastValueQueue)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + } else { + std::auto_ptr<Messages> m = Fairshare::create(_settings); + if (m.get()) { + messages = m; + QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } } persistLastNode= _settings.get(qpidPersistLastNode); @@ -910,6 +816,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings); + autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); if (flowLimit.get()) @@ -924,8 +834,8 @@ void Queue::destroy() { if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); - while(!messages.empty()){ - DeliverableMessage msg(getFront().payload); + while(!messages->empty()){ + DeliverableMessage msg(messages->front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); popAndDequeue(); @@ -939,6 +849,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); } void Queue::notifyDeleted() @@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } +struct AutoDeleteTask : qpid::sys::TimerTask +{ + Broker& broker; + Queue::shared_ptr queue; + + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + + void fire() + { + //need to detect case where queue was used after the task was + //created, but then became unused again before the task fired; + //in this case ignore this request as there will have already + //been a later task added + tryAutoDeleteImpl(broker, queue); + } +}; + +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (queue->autoDeleteTimeout && queue->canAutoDelete()) { + AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + broker.getClusterTimer().add(queue->autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + } else { + tryAutoDeleteImpl(broker, queue); + } +} + bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(ownershipLock); @@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership() bool Queue::setExclusiveOwner(const OwnershipToken* const o) { + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; @@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() { int Queue::getEventMode() { return eventMode; } -void Queue::setQueueEventManager(QueueEvents& mgr) -{ - eventMgr = &mgr; -} - void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange @@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { - if (m.payload) { - if (policy.get()) { - policy->recoverEnqueued(m.payload); - policy->enqueued(m); + for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { + try { + (*i)->enqueued(m); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } - if (flowLimit.get()) - flowLimit->enqueued(m); - mgntEnqStats(m.payload); + } + if (policy.get()) { + policy->enqueued(m); + } + /** todo make flowlimit an observer */ + if (flowLimit.get()) + flowLimit->enqueued(m); + mgntEnqStats(m.payload); +} + +void Queue::updateEnqueued(const QueuedMessage& m) +{ + if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); + if (policy.get()) { + policy->recoverEnqueued(payload); + } + enqueued(m); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg) } QueueListeners& Queue::getListeners() { return listeners; } +Messages& Queue::getMessages() { return *messages; } +const Messages& Queue::getMessages() const { return *messages; } void Queue::checkNotDeleted() { @@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted() } } +void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) +{ + observers.insert(observer); +} + void Queue::flush() { ScopedUse u(barrier); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 5af630f3c8..e8429128f7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -26,14 +26,17 @@ #include "qpid/broker/OwnershipToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" +#include "qpid/broker/Messages.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qpid/framing/amqp_types.h" @@ -46,6 +49,7 @@ #include <vector> #include <memory> #include <deque> +#include <set> #include <algorithm> namespace qpid { @@ -86,10 +90,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, ~ScopedUse() { if (acquired) barrier.release(); } }; - typedef std::deque<QueuedMessage> Messages; - typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ; + typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; + const std::string name; const bool autodelete; MessageStore* store; @@ -97,16 +101,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t consumerCount; OwnershipToken* exclusive; bool noLocal; - bool lastValueQueue; - bool lastValueQueueNoBrowse; bool persistLastNode; bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; QueueListeners listeners; - Messages messages; - Messages pendingDequeues;//used to avoid dequeuing during recovery - LVQ lvq; + std::auto_ptr<Messages> messages; + std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; @@ -122,12 +123,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, qmf::org::apache::qpid::broker::Queue* mgmtObject; RateTracker dequeueTracker; int eventMode; - QueueEvents* eventMgr; + Observers observers; bool insertSeqNo; std::string seqNoKey; Broker* broker; bool deleted; UsageBarrier barrier; + int autoDeleteTimeout; + boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -141,12 +144,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); + void enqueued(const QueuedMessage& msg); void dequeued(const QueuedMessage& msg); - void popMsg(QueuedMessage& qmsg); + void pop(); void popAndDequeue(); QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg); - void clearLVQIndex(const QueuedMessage& msg); + void forcePersistent(QueuedMessage& msg); + int getEventMode(); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { @@ -171,7 +175,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } - Messages::iterator findAt(framing::SequenceNumber pos); void checkNotDeleted(); public: @@ -277,7 +280,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, * thus are still logically on the queue) - used in * clustered broker. */ - void enqueued(const QueuedMessage& msg); + void updateEnqueued(const QueuedMessage& msg); /** * Test whether the specified message (identified by its @@ -322,13 +325,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { sys::Mutex::ScopedLock l(messageLock); - if (lastValueQueue) { - for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { - f(checkLvqReplace(*i)); - } - } else { - std::for_each(messages.begin(), messages.end(), f); - } + messages->foreach(f); } /** Apply f to each QueueBinding on the queue */ @@ -344,8 +341,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** return current position sequence number for the next message on the queue. */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); - int getEventMode(); - void setQueueEventManager(QueueEvents&); + void addObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); /** * Notify queue that recovery has completed. @@ -354,6 +350,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, // For cluster update QueueListeners& getListeners(); + Messages& getMessages(); + const Messages& getMessages() const; /** * Reserve space in policy for an enqueued message that diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index bba054b0b8..2c540ff1ad 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -19,6 +19,8 @@ * */ #include "qpid/broker/QueueEvents.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" @@ -115,6 +117,29 @@ bool QueueEvents::isSync() return sync; } +class EventGenerator : public QueueObserver +{ + public: + EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {} + void enqueued(const QueuedMessage& m) + { + manager.enqueued(m); + } + void dequeued(const QueuedMessage& m) + { + if (!enqueueOnly) manager.dequeued(m); + } + private: + QueueEvents& manager; + const bool enqueueOnly; +}; + +void QueueEvents::observe(Queue& queue, bool enqueueOnly) +{ + boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly)); + queue.addObserver(observer); +} + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h index c42752133e..fcddfe9092 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.h +++ b/qpid/cpp/src/qpid/broker/QueueEvents.h @@ -63,6 +63,7 @@ class QueueEvents QPID_BROKER_EXTERN void unregisterListener(const std::string& id); void enable(); void disable(); + void observe(Queue&, bool enqueueOnly); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); QPID_BROKER_EXTERN bool isSync(); diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h new file mode 100644 index 0000000000..a711213dee --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueObserver.h @@ -0,0 +1,42 @@ +#ifndef QPID_BROKER_QUEUEOBSERVER_H +#define QPID_BROKER_QUEUEOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace broker { + +class QueuedMessage; +/** + * Interface for notifying classes who want to act as 'observers' of a + * queue of particular events. + */ +class QueueObserver +{ + public: + virtual ~QueueObserver() {} + virtual void enqueued(const QueuedMessage&) = 0; + virtual void dequeued(const QueuedMessage&) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index f311ea8321..4168221ad0 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/PriorityQueue.h" #include "qpid/Exception.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -213,7 +214,10 @@ RingQueuePolicy::RingQueuePolicy(const std::string& _name, bool before(const QueuedMessage& a, const QueuedMessage& b) { - return a.position < b.position; + int priorityA = PriorityQueue::getPriority(a); + int priorityB = PriorityQueue::getPriority(b); + if (priorityA == priorityB) return a.position < b.position; + else return priorityA < priorityB; } void RingQueuePolicy::enqueued(const QueuedMessage& m) diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 28b2d60cda..ea2531dae7 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -47,7 +47,6 @@ QueueRegistry::declare(const string& declareName, bool durable, Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); queues[name] = queue; if (lastNode) queue->setLastNodeFailure(); - if (events) queue->setQueueEventManager(*events); return std::pair<Queue::shared_ptr, bool>(queue, true); } else { @@ -108,8 +107,3 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode) } lastNode = _lastNode; } - -void QueueRegistry::setQueueEvents(QueueEvents* e) -{ - events = e; -} diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 66437f9665..57859fe639 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -96,8 +96,6 @@ class QueueRegistry { */ std::string generateName(); - void setQueueEvents(QueueEvents*); - /** * Set the store to use. May only be called once. */ diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 7106f85807..69b364ad7b 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -33,7 +33,7 @@ using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), - connection(c), + connection(c), proxy(out), clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} @@ -69,7 +69,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); - connection.closeChannel(channel.get()); + connection.closeChannel(channel.get()); } void SessionHandler::setState(const std::string& name, bool force) { @@ -78,7 +78,7 @@ void SessionHandler::setState(const std::string& name, bool force) { session = connection.broker.getSessionManager().attach(*this, id, force); } -void SessionHandler::detaching() +void SessionHandler::detaching() { assert(session.get()); session->disableOutput(); @@ -98,7 +98,10 @@ void SessionHandler::attachAs(const std::string& name) { SessionId id(connection.getUserId(), name); SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); - session.reset(new SessionState(connection.getBroker(), *this, id, config)); + // Delay creating management object till attached(). In a cluster, + // only the active link broker calls attachAs but all brokers + // receive the subsequent attached() call. + session.reset(new SessionState(connection.getBroker(), *this, id, config, true)); sendAttach(false); } @@ -109,6 +112,7 @@ void SessionHandler::attachAs(const std::string& name) void SessionHandler::attached(const std::string& name) { if (session.get()) { + session->addManagementObject(); // Delayed from attachAs() amqp_0_10::SessionHandler::attached(name); } else { SessionId id(connection.getUserId(), name); @@ -117,5 +121,5 @@ void SessionHandler::attached(const std::string& name) markReadyToSend(); } } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d572e37d00..2e69102537 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -53,7 +53,8 @@ using qpid::sys::AbsTime; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( - Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + Broker& b, SessionHandler& h, const SessionId& id, + const SessionState::Configuration& config, bool delayManagement) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this, *this), @@ -71,6 +72,12 @@ SessionState::SessionState( QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); } } + if (!delayManagement) addManagementObject(); + attach(h); +} + +void SessionState::addManagementObject() { + if (GetManagementObject()) return; // Already added. Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); @@ -80,11 +87,11 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); + if (rateFlowcontrol) + mgmtObject->set_maxClientRate(rateFlowcontrol->getRate()); agent->addObject(mgmtObject); } } - attach(h); } SessionState::~SessionState() { diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index f4c10295b1..568e8593fa 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -73,7 +73,8 @@ class SessionState : public qpid::SessionState, public framing::FrameHandler::InOutHandler { public: - SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); + SessionState(Broker&, SessionHandler&, const SessionId&, + const SessionState::Configuration&, bool delayManagement=false); ~SessionState(); bool isAttached() const { return handler; } @@ -127,8 +128,11 @@ class SessionState : public qpid::SessionState, // the SessionState of a received Execution.Sync command. void addPendingExecutionSync(); - private: + // Used to delay creation of management object for sessions + // belonging to inter-broker bridges + void addManagementObject(); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp new file mode 100644 index 0000000000..4f35884af8 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/broker/EventQueueThresholdExceeded.h" + +namespace qpid { +namespace broker { +ThresholdAlerts::ThresholdAlerts(const std::string& n, + qpid::management::ManagementAgent& a, + const uint32_t ct, + const uint64_t st, + const long repeat) + : name(n), agent(a), countThreshold(ct), sizeThreshold(st), + repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0), + count(0), size(0), lastAlert(qpid::sys::EPOCH) {} + +void ThresholdAlerts::enqueued(const QueuedMessage& m) +{ + size += m.payload->contentSize(); + ++count; + if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) { + if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH) + || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) { + agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size)); + lastAlert = qpid::sys::now(); + } + } +} + +void ThresholdAlerts::dequeued(const QueuedMessage& m) +{ + size -= m.payload->contentSize(); + --count; + if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) { + lastAlert = qpid::sys::EPOCH; + } +} + + + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval) +{ + if (countThreshold || sizeThreshold) { + boost::shared_ptr<QueueObserver> observer( + new ThresholdAlerts(queue.getName(), agent, countThreshold, sizeThreshold, repeatInterval) + ); + queue.addObserver(observer); + } +} + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings) + +{ + qpid::types::Variant::Map map; + qpid::amqp_0_10::translate(settings, map); + observe(queue, agent, map); +} + +template <class T> +class Option +{ + public: + Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); } + void addAlias(const std::string& name) { names.push_back(name); } + T get(const qpid::types::Variant::Map& settings) const + { + T value(defaultValue); + for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) { + if (get(settings, *i, value)) break; + } + return value; + } + private: + std::vector<std::string> names; + T defaultValue; + + bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const + { + qpid::types::Variant::Map::const_iterator i = settings.find(name); + if (i != settings.end()) { + try { + value = (T) i->second; + } catch (const qpid::types::InvalidConversion&) { + QPID_LOG(warning, "Bad value for" << name << ": " << i->second); + } + return true; + } else { + return false; + } + } +}; + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings) + +{ + //Note: aliases are keys defined by java broker + Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60); + repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap"); + + //If no explicit threshold settings were given use 80% of any + //limit from the policy. + const QueuePolicy* policy = queue.getPolicy(); + Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0)); + countThreshold.addAlias("x-qpid-maximum-message-count"); + Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0)); + sizeThreshold.addAlias("x-qpid-maximum-message-size"); + + observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings)); +} + +}} diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h new file mode 100644 index 0000000000..e1f59252c4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h @@ -0,0 +1,73 @@ +#ifndef QPID_BROKER_THRESHOLDALERTS_H +#define QPID_BROKER_THRESHOLDALERTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueObserver.h" +#include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace management { +class ManagementAgent; +} +namespace broker { + +class Queue; +/** + * Class to manage generation of QMF alerts when particular thresholds + * are breached on a queue. + */ +class ThresholdAlerts : public QueueObserver +{ + public: + ThresholdAlerts(const std::string& name, + qpid::management::ManagementAgent& agent, + const uint32_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings); + private: + const std::string name; + qpid::management::ManagementAgent& agent; + const uint32_t countThreshold; + const uint64_t sizeThreshold; + const qpid::sys::Duration repeatInterval; + uint64_t count; + uint64_t size; + qpid::sys::AbsTime lastAlert; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 6acd0a3ced..030b804143 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&) window = capacity; } } - -qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!get(result, timeout)) throw NoMessageAvailable(); return result; } - -qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!fetch(result, timeout)) throw NoMessageAvailable(); @@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur return f.result; } -void ReceiverImpl::close() -{ +void ReceiverImpl::close() +{ execute<Close>(); } @@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled() return parent->getUnsettledAcks(destination); } -ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, - const qpid::messaging::Address& a) : +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a) : - parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging: } } -void ReceiverImpl::closeImpl() -{ +void ReceiverImpl::closeImpl() +{ sys::Mutex::ScopedLock l(lock); if (state != CANCELLED) { state = CANCELLED; + sync(session).messageStop(destination); + parent->releasePending(destination); source->cancel(session, destination); parent->receiverCancelled(destination); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 6d98527627..75a71997fd 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command { qpid::messaging::Receiver result; const qpid::messaging::Address& address; - + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createReceiverImpl(address); } @@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command { qpid::messaging::Sender result; const qpid::messaging::Address& address; - + CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createSenderImpl(address); } @@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const throw KeyError(name); } else { return i->second; - } + } } Receiver SessionImpl::getReceiver(const std::string& name) const @@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT } } -bool SessionImpl::accept(ReceiverImpl* receiver, - qpid::messaging::Message* message, +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { @@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } } @@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command { const std::string* destination; uint32_t result; - + Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getReceivableImpl(destination); } }; @@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command { const std::string* destination; uint32_t result; - + UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getUnsettledAcksImpl(destination); } }; @@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl() getImplPtr<Receiver, ReceiverImpl>(i->second)->stop(); } //ensure that stop has been processed and all previously sent - //messages are available for release: + //messages are available for release: session.sync(); incoming.releaseAll(); - session.txRollback(); + session.txRollback(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr<Receiver, ReceiverImpl>(i->second)->start(); @@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name) incoming.releasePending(name); } +void SessionImpl::releasePending(const std::string& name) +{ + ScopedLock l(lock); + incoming.releasePending(name); +} + void SessionImpl::senderCancelled(const std::string& name) { ScopedLock l(lock); @@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->open(); + connection->open(); } bool SessionImpl::backoff() { - return connection->backoff(); + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 3dd5cd0189..2a2aa47df6 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl void checkError(); bool hasError(); - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void releasePending(const std::string& destination); void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); @@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } @@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl struct Acknowledge1 : Command { qpid::messaging::Message& message; - + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { impl.acknowledgeImpl(message); } }; - + struct CreateSender; struct CreateReceiver; struct UnsettledAcks; @@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl F f(*this); return execute(f); } - + template <class F> void retry() { while (!execute<F>()) {} } - + template <class F, class P> bool execute1(P p) { F f(*this, p); diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp index baeaafb478..f6e1c7a849 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -5,7 +5,7 @@ * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance + * "License "); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -33,6 +33,25 @@ using std::max; using sys::Timer; using sys::TimerTask; +// +// Note on use of Broker::getTimer() rather than getClusterTime in broker code. +// The following uses of getTimer() are cluster safe: +// +// LinkRegistry: maintenance visits in timer can call Bridge::create/cancel +// but these don't modify any management state. +// +// broker::Connection: +// - Heartbeats use ClusterOrderOutput to ensure consistency +// - timeout: aborts connection in timer, cluster does an orderly connection close. +// +// SessionState: scheduledCredit - uses ClusterOrderProxy +// Broker::queueCleaner: cluster implements ExpiryPolicy for consistent expiry. +// +// Broker::dtxManager: dtx disabled with cluster. +// +// requestIOProcessing: called in doOutput. +// + ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) { // Allow more generous overrun threshold with cluster as we diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index c7689577a7..e9b718e6de 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -32,6 +32,7 @@ #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Queue.h" @@ -528,7 +529,7 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->enqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } @@ -548,6 +549,13 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi findQueue(qname)->setPosition(position); } +void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) +{ + if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { + QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); + } +} + void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index d90cdd898b..7ee85bf1aa 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -152,6 +152,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); + void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); void expiryId(uint64_t); void txStart(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 4f6488a28a..8f751add9b 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -32,6 +32,7 @@ #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" @@ -352,6 +353,10 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1)); ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition()); + uint priority, count; + if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { + ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); + } } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index c9d53c028b..cf065e1ba9 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -62,7 +62,7 @@ endif (MSVC) # Like this to work with cmake 2.4 on Unix set (qpid_test_boost_libs - ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}) + ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} ${Boost_SYSTEM_LIBRARY}) # Macro to make it easier to remember where the tests are built macro(remember_location testname) diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index fc1632b4e1..991ec847bf 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture ~QueueCreatePolicyFixture() { - admin.deleteQueue(address.getName()); + admin.deleteQueue(address.getName()); } }; @@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture ~ExchangeCreatePolicyFixture() { - admin.deleteExchange(address.getName()); + admin.deleteExchange(address.getName()); } }; @@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) s1.close(); Receiver r1 = fix.session.createReceiver(a1); r1.close(); - + std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); @@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification) { MessagingFixture fix; fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}"); - BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); + BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); } QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) @@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber) QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser) { MessagingFixture fix; - + std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }"; std::string browseAddress = "exclusive-queue; { mode: browse }"; Receiver receiver = fix.session.createReceiver(address); fix.session.sync(); - Connection c2 = fix.newConnection(); + Connection c2 = fix.newConnection(); c2.open(); Session s2 = c2.createSession(); - + BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress)); - c2.close(); + c2.close(); +} + + +QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages) +{ + MessagingFixture fix; + const uint capacity = 5; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}"); + Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}"); + + receiver1.setCapacity(capacity); + receiver2.setCapacity(capacity*2); + + Message out("test-message"); + for (uint i = 0; i < capacity*2; ++i) { + sender.send(out); + } + + receiver1.close(); + + // Make sure all pending messages were sent to the alternate + // exchange when the queue was deleted. + Message in; + for (uint i = 0; i < capacity*2; ++i) { + in = receiver2.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } } QPID_AUTO_TEST_CASE(testAuthenticatedUsername) @@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) messages.push_back(msg); } const uint batch(10); //acknowledge first 10 messages only - for (uint i = 0; i < batch; ++i) { + for (uint i = 0; i < batch; ++i) { other.acknowledge(messages[i]); } messages.clear(); @@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) other.close(); other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < (count-batch); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); @@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) //check unacknowledged messages are still enqueued other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < ((count-batch)/2); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index c9e6d79ee7..7d17dd7bde 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -94,7 +94,7 @@ cluster_test_SOURCES = \ cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework -qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail +qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST) endif diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 903a20ec28..f2ccd0ba84 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); } +QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + FieldTable arguments; + arguments.setInt("x-qpid-priorities", 10); + arguments.setInt("x-qpid-fairshare", 5); + c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); + + //send messages of different priorities + for (int i = 0; i < 20; i++) { + Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); + msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); + c0.session.messageTransfer(arg::content=msg); + } + + //pull off a couple of the messages (first four should be the top priority messages + for (int i = 0; i < 4; i++) { + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); + } + + // Add another member + cluster.add(); + Client c1(cluster[1], "c1"); + + //pull off some more messages + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); + + //check queue has same content on both nodes + BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index e865a49813..1f77226b4d 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -65,6 +65,8 @@ op.add_option("--connection-options", type="str", help="Connection options for senders & receivers") op.add_option("--flow-control", default=0, type="int", metavar="N", help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") +op.add_option("--durable", default=False, action="store_true", + help="Use durable queues and messages") single_quote_re = re.compile("'") def posix_quote(string): @@ -76,7 +78,9 @@ def ssh_command(host, command): return ["ssh", host] + [posix_quote(arg) for arg in command] def start_receive(queue, index, opts, ready_queue, broker, host): - address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + address_opts=["create:receiver"] + opts.receive_option + if opts.durable: address_opts += ["node:{durable:true}"] + address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages messages = msg_total/opts.receivers; if (index < msg_total%opts.receivers): messages += 1 @@ -111,7 +115,8 @@ def start_send(queue, opts, broker, host): "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), "--sequence=no", - "--flow-control", str(opts.flow_control) + "--flow-control", str(opts.flow_control), + "--durable", str(opts.durable) ] command += opts.send_arg if opts.connection_options: @@ -140,12 +145,12 @@ def print_header(timestamp): def parse(parser, lines): # Parse sender/receiver output for l in lines: fn_val = zip(parser, l) - + return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): return parse([int],[first_line(p) for p in senders]) - + def parse_receivers(receivers): return parse([int,float,float,float],[first_line(p) for p in receivers if p]) @@ -168,7 +173,7 @@ def print_summary(send_stats, recv_stats): l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) print summary - + class ReadyReceiver: """A receiver for ready messages""" @@ -177,7 +182,7 @@ class ReadyReceiver: self.connection = qpid.messaging.Connection(broker) self.connection.open() self.receiver = self.connection.session().receiver( - "%s;{create:always,delete:always}"%(queue)) + "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) self.receiver.session.sync() self.timeout=2 diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 28e229ca27..012d544a2e 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -206,6 +206,7 @@ int main(int argc, char ** argv) if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; + if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl; if (msg.getDurable()) std::cout << "Durable: true" << std::endl; if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; std::cout << "Properties: " << msg.getProperties() << std::endl; diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index c71cb83f9a..6a7e7838ce 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -56,6 +56,7 @@ struct Options : public qpid::Options uint sendEos; bool durable; uint ttl; + uint priority; std::string userid; std::string correlationid; string_vector properties; @@ -84,6 +85,7 @@ struct Options : public qpid::Options sendEos(0), durable(false), ttl(0), + priority(0), contentString(), contentSize(0), contentStdin(false), @@ -110,6 +112,7 @@ struct Options : public qpid::Options ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") @@ -266,7 +269,14 @@ int main(int argc, char ** argv) if (opts.ttl) { msg.setTtl(Duration(opts.ttl)); } - if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto)); + if (opts.priority) { + msg.setPriority(opts.priority); + } + if (!opts.replyto.empty()) { + if (opts.flowControl) + throw Exception("Can't use reply-to and flow-control together"); + msg.setReplyTo(Address(opts.replyto)); + } if (!opts.userid.empty()) msg.setUserId(opts.userid); if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); opts.setProperties(msg); @@ -305,13 +315,17 @@ int main(int argc, char ** argv) if (opts.timestamp) msg.getProperties()[TS] = int64_t( qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - if (opts.flowControl && ((sent % opts.flowControl) == 0)) { - msg.setReplyTo(flowControlAddress); - ++flowSent; + if (opts.flowControl) { + if ((sent % opts.flowControl) == 0) { + msg.setReplyTo(flowControlAddress); + ++flowSent; + } + else + msg.setReplyTo(Address()); // Clear the reply address. } - sender.send(msg); reporter.message(msg); + if (opts.tx && (sent % opts.tx == 0)) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) @@ -331,7 +345,6 @@ int main(int argc, char ** argv) int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - msg = Message(); // Clear out contents and properties for next iteration } for ( ; flowSent>0; --flowSent) flowControlReceiver.get(Duration::SECOND); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 5e407a061f..be1c1f868c 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -275,6 +275,13 @@ <!-- Replicate encoded config objects - e.g. links and bridges. --> <control name="config" code="0x37"><field name="encoded" type="str32"/></control> + + <!-- Set the fairshare delivery related state of a replicated queue. --> + <control name="queue-fairshare-state" code="0x38"> + <field name="queue" type="str8"/> + <field name="position" type="uint8"/> + <field name="count" type="uint8"/> + </control> </class> </amqp> |
