diff options
Diffstat (limited to 'qpid/java/broker')
68 files changed, 3469 insertions, 513 deletions
diff --git a/qpid/java/broker/bin/passwd b/qpid/java/broker/bin/passwd new file mode 100644 index 0000000000..c1bb05c082 --- /dev/null +++ b/qpid/java/broker/bin/passwd @@ -0,0 +1,21 @@ +#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. qpid-run org.apache.qpid.server.security.Passwd "$@"
diff --git a/qpid/java/broker/bin/qpid-server b/qpid/java/broker/bin/qpid-server index 0080209479..a2b416b12b 100644 --- a/qpid/java/broker/bin/qpid-server +++ b/qpid/java/broker/bin/qpid-server @@ -18,4 +18,13 @@ # under the License. # +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + QPID_CLASSPATH=$QPID_LIBS + . qpid-run org.apache.qpid.server.Main "$@" diff --git a/qpid/java/broker/bin/qpid.stop b/qpid/java/broker/bin/qpid.stop index 1bffc8cdb8..9193d3c4e1 100644 --- a/qpid/java/broker/bin/qpid.stop +++ b/qpid/java/broker/bin/qpid.stop @@ -5,9 +5,9 @@ # Script checks for a given pid running PROGRAM and attempts to quit it # -MAX_ATTEMPTS=5 -SLEEP_DELAY=2 -PROGRAM="org.apache.qpid.server.Main" +MAX_ATTEMPTS=1 +SLEEP_DELAY=1 +PROGRAM="DQPID" # @@ -15,7 +15,8 @@ PROGRAM="org.apache.qpid.server.Main" # printActions() { -ps=`ps o command p $1|grep $PROGRAM` +#ps=`ps o command p $1|grep $PROGRAM` +ps=`ps -o args -p $1|grep $PROGRAM` echo "Attempting to kill: $ps" } @@ -36,25 +37,25 @@ quit() kill $1 } +# +# Grep the ps log for the PID ($1) to ensure that it has quit +# +lookup() +{ +result=`ps -o args -p $1 |grep -v grep |grep $PROGRAM |wc -l` +} # # Sleep and then check then lookup the PID($1) to ensure it has quit # check() { +echo "Waiting $SLEEP_DELAY second for $1 to exit" sleep $SLEEP_DELAY lookup $1 } -# -# Grep the ps log for the PID ($1) to ensure that it has quit -# -lookup() -{ -result=`ps p $1 |grep -v grep |grep $PROGRAM |wc -l` -} - # # Verify the PID($1) is available @@ -62,7 +63,7 @@ result=`ps p $1 |grep -v grep |grep $PROGRAM |wc -l` verifyPid() { lookup $1 -if [[ $result == 1 ]] ; then +if [[ $[$result] == 1 ]] ; then brokerspid=$1 else echo "Unable to locate Qpid Process with PID $1" @@ -70,8 +71,6 @@ else fi } - - # # Main Run # @@ -89,22 +88,21 @@ printActions $brokerspid # Attempt to quit the process MAX_ATTEMPTS Times attempt=0 -while [[ $result > 0 && $attempt < $MAX_ATTEMPTS ]] ; do +while [[ $[$result] > 0 && $[$attempt] < $[$MAX_ATTEMPTS] ]] ; do quit $brokerspid check $brokerspid attempt=$[$attempt + 1] done - # Check that it has quit -if [[ $results == 0 ]] ; then +if [[ $[$result] == 0 ]] ; then echo "Process quit" exit 0 else # Now attempt to force quit the process attempt=0 - while [[ $result > 0 && $attempt < $MAX_ATTEMPTS ]] ; do + while [[ $[$result] > 0 && $[$attempt] < $[$MAX_ATTEMPTS] ]] ; do forceQuit $brokerspid check $brokerspid attempt=$[$attempt + 1] @@ -112,7 +110,7 @@ else # Output final status - if [[ $attempt == $MAX_ATTEMPTS ]] ; then + if [[ $[$result] > 0 && $[$attempt] == $[$MAX_ATTEMPTS] ]] ; then echo "Stopped trying to kill process: $brokerspid" echo "Attempted to stop $attempt times" else diff --git a/qpid/java/broker/bin/qpid.stopall b/qpid/java/broker/bin/qpid.stopall index f6862842c9..2e762bdd50 100644 --- a/qpid/java/broker/bin/qpid.stopall +++ b/qpid/java/broker/bin/qpid.stopall @@ -6,17 +6,16 @@ # Utilises qpid.stop to perform the actual stopping # -MAX_ATTEMPTS=5 -SLEEP_DELAY=2 -PROGRAM="org.apache.qpid.server.Main" +PROGRAM="DQPID" # # grep ps for instances of $PROGRAM and collect PIDs # lookup() { -pids=`ps o pid,command |grep -v grep | grep $PROGRAM | cut -d ' ' -f 1` -result=`echo -n $pids | wc -l` +#pids=`ps o pid,command | grep $PROGRAM | grep -v grep | cut -d ' ' -f 1` +pids=`ps -ef |grep $USER | grep $PROGRAM | grep -v grep | awk '{print $2}'` +result=`echo -n $pids | wc -w` } @@ -25,7 +24,7 @@ result=`echo -n $pids | wc -l` # showPids() { -ps p $pids +ps -o user,pid,args -p $pids } @@ -35,7 +34,7 @@ ps p $pids lookup -if [[ $result == 0 ]] ; then +if [[ $[$result] == 0 ]] ; then echo "No Qpid Brokers found running under user '$USER'" exit 0 fi @@ -49,7 +48,7 @@ done # Check we have quit all lookup -if [[ $result == 0 ]] ; then +if [[ $[$result] == 0 ]] ; then echo "All Qpid brokers successfully quit" else echo "Some brokers were not quit" diff --git a/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml b/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml index 4a7343660d..4b32630771 100644 --- a/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml +++ b/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml @@ -78,6 +78,12 @@ <fileMode>420</fileMode> </file> <file> + <source>../etc/jmxremote.access</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>jmxremote.access</destName> + <fileMode>420</fileMode> + </file> + <file> <source>../etc/log4j.xml</source> <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> <destName>log4j.xml</destName> @@ -108,6 +114,12 @@ <fileMode>473</fileMode> </file> <file> + <source>../bin/passwd</source> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <destName>passwd</destName> + <fileMode>473</fileMode> + </file> + <file> <source>../bin/qpid-server</source> <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> <destName>qpid-server</destName> diff --git a/qpid/java/broker/etc/access b/qpid/java/broker/etc/access new file mode 100644 index 0000000000..a781ed8aa9 --- /dev/null +++ b/qpid/java/broker/etc/access @@ -0,0 +1 @@ +guest:localhost(rw),test(rw)
\ No newline at end of file diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml index 3789e6fcb6..c66c2f632e 100644 --- a/qpid/java/broker/etc/config.xml +++ b/qpid/java/broker/etc/config.xml @@ -41,6 +41,8 @@ </connector> <management> <enabled>true</enabled> + <jmxport>8999</jmxport> + <security-enabled>true</security-enabled> </management> <advanced> <filterchain enableExecutorPool="true"/> @@ -63,13 +65,14 @@ </attributes> </principal-database> - <!--principal-database> - <name>md5passwordfile</name> - <class>org.apache.qpid.server.security.auth.database.MD5PasswordFilePrincipalDatabase</class> + <!-- Example use of Base64 encoded MD5 hashes for authentication via CRAM-MD5-Hashed + <principal-database> + <name>passwordfile</name> + <class>org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase</class> <attributes> <attribute> <name>passwordFile</name> - <value>${conf}/md5passwd</value> + <value>${conf}/qpid.passwd</value> </attribute> </attributes> </principal-database--> @@ -78,6 +81,10 @@ <access> <class>org.apache.qpid.server.security.access.AllowAll</class> </access> + <jmx> + <access>${conf}/jmxremote.access</access> + <principal-database>passwordfile</principal-database> + </jmx> </security> <virtualhosts> @@ -85,9 +92,10 @@ <name>localhost</name> <localhost> <store> - <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> --> + <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/localhost-store</environment-path> --> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> - <environment-path>localhost-store</environment-path> </store> <security> diff --git a/qpid/java/broker/etc/jmxremote.access b/qpid/java/broker/etc/jmxremote.access new file mode 100644 index 0000000000..d1172fc197 --- /dev/null +++ b/qpid/java/broker/etc/jmxremote.access @@ -0,0 +1,3 @@ +admin=admin +guest=readonly +user=readwrite diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml index 74b80c0e80..b442227607 100644 --- a/qpid/java/broker/etc/log4j.xml +++ b/qpid/java/broker/etc/log4j.xml @@ -44,20 +44,16 @@ <param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> - <!--param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/--> + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> </layout> </appender> - <appender name="FileAppender" class="org.apache.log4j.FileAppender"> - <param name="staticLogFileName" value="false"/> - + <appender name="FileAppender" class="org.apache.log4j.FileAppender"> <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/> <param name="Append" value="false"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/> - + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> </layout> </appender> diff --git a/qpid/java/broker/etc/persistent_config.xml b/qpid/java/broker/etc/persistent_config.xml new file mode 100644 index 0000000000..178a73515c --- /dev/null +++ b/qpid/java/broker/etc/persistent_config.xml @@ -0,0 +1,132 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + + This is an example config using the BDBMessageStore available from + the Red Hat Messaging project at etp.108.redhat.com and distributed under GPL. + --> + +<broker> + <prefix>${QPID_HOME}</prefix> + <work>${QPID_WORK}</work> + <conf>${prefix}/etc</conf> + <connector> + <qpidnio>true</qpidnio> + <transport>nio</transport> + <port>5672</port> + <sslport>8672</sslport> + <socketReceiveBuffer>32768</socketReceiveBuffer> + <socketSendBuffer>32768</socketSendBuffer> + </connector> + <management> + <enabled>true</enabled> + <jmxport>8999</jmxport> + </management> + <advanced> + <filterchain enableExecutorPool="true"/> + <enablePooledAllocator>false</enablePooledAllocator> + <enableDirectBuffers>false</enableDirectBuffers> + <framesize>65535</framesize> + <compressBufferOnQueue>false</compressBufferOnQueue> + </advanced> + + <security> + <principal-databases> + <principal-database> + <name>passwordfile</name> + <class>org.apache.qpid.server.security.auth.database.PlainPasswordVhostFilePrincipalDatabase</class> + <attributes> + <attribute> + <name>passwordFile</name> + <value>${conf}/passwdVhost</value> + </attribute> + </attributes> + </principal-database> + </principal-databases> + + <access> + <class>org.apache.qpid.server.security.access.AllowAll</class> + </access> + <jmx> + <access>${conf}/jmxremote.access</access> + <principal-database>passwordfile</principal-database> + </jmx> + </security> + + <virtualhosts> + <virtualhost> + <name>localhost</name> + <localhost> + <store> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/bdbstore/localhost-store</environment-path> + </store> + + <security> + <access> + <class>org.apache.qpid.server.security.access.PrincipalDatabaseAccessManager</class> + <attributes> + <attribute> + <name>principalDatabase</name> + <value>passwordfile</value> + </attribute> + <attribute> + <name>defaultAccessManager</name> + <value>DenyAll</value> + </attribute> + </attributes> + </access> + </security> + </localhost> + </virtualhost> + + <virtualhost> + <name>development</name> + <development> + <store> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/bdbstore/development-store</environment-path> + </store> + </development> + </virtualhost> + + <virtualhost> + <name>test</name> + <test> + <store> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> + <environment-path>${work}/bdbstore/test-store</environment-path> + </store> + </test> + </virtualhost> + + </virtualhosts> + <heartbeat> + <delay>0</delay> + <timeoutFactor>2.0</timeoutFactor> + </heartbeat> + <queue> + <auto_register>true</auto_register> + </queue> + + <virtualhosts>${conf}/virtualhosts.xml</virtualhosts> +</broker> + + diff --git a/qpid/java/broker/etc/transient_config.xml b/qpid/java/broker/etc/transient_config.xml new file mode 100644 index 0000000000..164d66cd1b --- /dev/null +++ b/qpid/java/broker/etc/transient_config.xml @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + + This is an example config file that uses the MemoryMessageStore. + As a result it is aimed at brokers sending transient messages. + + --> +<broker> + <prefix>${QPID_HOME}</prefix> + <work>${QPID_WORK}</work> + <conf>${prefix}/etc</conf> + <connector> + <qpidnio>true</qpidnio> + <transport>nio</transport> + <port>5672</port> + <sslport>8672</sslport> + <socketReceiveBuffer>32768</socketReceiveBuffer> + <socketSendBuffer>32768</socketSendBuffer> + </connector> + <management> + <enabled>true</enabled> + <jmxport>8999</jmxport> + </management> + <advanced> + <filterchain enableExecutorPool="true"/> + <enablePooledAllocator>false</enablePooledAllocator> + <enableDirectBuffers>false</enableDirectBuffers> + <framesize>65535</framesize> + <compressBufferOnQueue>false</compressBufferOnQueue> + </advanced> + + <security> + <principal-databases> + <principal-database> + <name>passwordfile</name> + <class>org.apache.qpid.server.security.auth.database.PlainPasswordVhostFilePrincipalDatabase</class> + <attributes> + <attribute> + <name>passwordFile</name> + <value>${conf}/passwdVhost</value> + </attribute> + </attributes> + </principal-database> + </principal-databases> + <access> + <class>org.apache.qpid.server.security.access.AllowAll</class> + </access> + <jmx> + <access>${conf}/jmxremote.access</access> + <principal-database>passwordfile</principal-database> + </jmx> + </security> + + <virtualhosts> + <virtualhost> + <name>localhost</name> + <localhost> + <store> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + </store> + + <security> + <access> + <class>org.apache.qpid.server.security.access.PrincipalDatabaseAccessManager</class> + <attributes> + <attribute> + <name>principalDatabase</name> + <value>passwordfile</value> + </attribute> + <attribute> + <name>defaultAccessManager</name> + <value>DenyAll</value> + </attribute> + </attributes> + </access> + </security> + </localhost> + </virtualhost> + + <virtualhost> + <name>development</name> + <development> + <store> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + </store> + </development> + </virtualhost> + + <virtualhost> + <name>test</name> + <test> + <store> + <class>org.apache.qpid.server.store.MemoryMessageStore</class> + </store> + </test> + </virtualhost> + + </virtualhosts> + <heartbeat> + <delay>0</delay> + <timeoutFactor>2.0</timeoutFactor> + </heartbeat> + <queue> + <auto_register>true</auto_register> + </queue> + + <virtualhosts>${conf}/virtualhosts.xml</virtualhosts> +</broker> + + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 23c32aceab..d31359b019 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,8 +42,12 @@ import javax.management.MBeanException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -36,9 +60,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.commons.configuration.Configuration; /** * This MBean implements the broker management interface and exposes the @@ -82,8 +103,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr * @param autoDelete * @throws JMException */ - public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) - throws JMException + public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException { try { @@ -92,7 +112,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { - exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0); + exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), + durable, false, 0); _exchangeRegistry.registerExchange(exchange); } else @@ -140,8 +161,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr * @param autoDelete * @throws JMException */ - public void createNewQueue(String queueName, String owner, boolean durable,boolean autoDelete) - throws JMException + public void createNewQueue(String queueName, String owner, boolean durable) throws JMException { AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); if (queue != null) @@ -156,22 +176,27 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr { ownerShortString = new AMQShortString(owner); } - queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, autoDelete, getVirtualHost()); + + queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost()); if (queue.isDurable() && !queue.isAutoDelete()) { _messageStore.createQueue(queue); } - Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue); + Configuration virtualHostDefaultQueueConfiguration = + VirtualHostConfiguration.getDefaultQueueConfiguration(queue); if (virtualHostDefaultQueueConfiguration != null) { Configurator.configure(queue, virtualHostDefaultQueueConfiguration); } + _queueRegistry.registerQueue(queue); } catch (AMQException ex) { - throw new MBeanException(new JMException(ex.getMessage()),"Error in creating queue " + queueName); + JMException jme = new JMException(ex.getMessage()); + jme.initCause(ex); + throw new MBeanException(jme, "Error in creating queue " + queueName); } } @@ -202,7 +227,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr } catch (AMQException ex) { - throw new MBeanException(new JMException(ex.getMessage()), "Error in deleting queue " + queueName); + JMException jme = new JMException(ex.getMessage()); + jme.initCause(ex); + throw new MBeanException(jme, "Error in deleting queue " + queueName); } } @@ -213,7 +240,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr // This will have a single instance for a virtual host, so not having the name property in the ObjectName public ObjectName getObjectName() throws MalformedObjectNameException - { + { return getObjectNameForSingleInstanceMBean(); } } // End of MBean class diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1ebe5fa0a2..2e1653e69d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -472,7 +472,7 @@ public class AMQChannel if (unacked.queue != null) { // Ensure message is released for redelivery - unacked.message.release(); + unacked.message.release(unacked.queue); // Mark message redelivered unacked.message.setRedelivered(true); @@ -503,7 +503,10 @@ public class AMQChannel { // Ensure message is released for redelivery - unacked.message.release(); + if (unacked.queue != null) + { + unacked.message.release(unacked.queue); + } // Mark message redelivered unacked.message.setRedelivered(true); @@ -672,14 +675,14 @@ public class AMQChannel // else // { //release to allow it to be delivered - msg.release(); + msg.release(message.queue); // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(); + Subscription sub = msg.getDeliveredSubscription(message.queue); if (sub != null) { @@ -753,7 +756,7 @@ public class AMQChannel // Process Messages to Requeue at the front of the queue for (UnacknowledgedMessage message : msgToRequeue) { - message.message.release(); + message.message.release(message.queue); message.message.setRedelivered(true); deliveryContext.deliver(message.message, message.queue, true); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 38a505c6c7..146d0566ce 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,14 +36,17 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.configuration.ConfigurationException; + import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -59,7 +62,7 @@ import org.apache.qpid.url.URLSyntaxException; * Main entry point for AMQPD. * */ -@SuppressWarnings({"AccessStaticViaInstance"}) +@SuppressWarnings({ "AccessStaticViaInstance" }) public class Main { private static final Logger _logger = Logger.getLogger(Main.class); @@ -74,9 +77,9 @@ public class Main protected static class InitException extends Exception { - InitException(String msg) + InitException(String msg, Throwable cause) { - super(msg); + super(msg, cause); } } @@ -97,6 +100,7 @@ public class Main try { commandLine = new PosixParser().parse(options, args); + return true; } catch (ParseException e) @@ -104,6 +108,7 @@ public class Main System.err.println("Error: " + e.getMessage()); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("Qpid", options, true); + return false; } } @@ -112,17 +117,26 @@ public class Main { Option help = new Option("h", "help", false, "print this message"); Option version = new Option("v", "version", false, "print the version information and exit"); - Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file"). - withLongOpt("config").create("c"); - Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file"). - withLongOpt("port").create("p"); - Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file"). - withLongOpt("bind").create("b"); - Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " + - "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file"). - withLongOpt("logconfig").create("l"); - Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " + - "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); + Option configFile = + OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config") + .create("c"); + Option port = + OptionBuilder.withArgName("port").hasArg() + .withDescription("listen on the specified port. Overrides any value in the config file") + .withLongOpt("port").create("p"); + Option bind = + OptionBuilder.withArgName("bind").hasArg() + .withDescription("bind to the specified address. Overrides any value in the config file") + .withLongOpt("bind").create("b"); + Option logconfig = + OptionBuilder.withArgName("logconfig").hasArg() + .withDescription("use the specified log4j xml configuration file. By " + + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + + " in the same directory as the configuration file").withLongOpt("logconfig").create("l"); + Option logwatchconfig = + OptionBuilder.withArgName("logwatch").hasArg() + .withDescription("monitor the log file configuration file for changes. Units are seconds. " + + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); options.addOption(help); options.addOption(version); @@ -150,7 +164,7 @@ public class Main boolean first = true; for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions()) { - if(first) + if (first) { first = false; } @@ -158,9 +172,11 @@ public class Main { protocol.append(", "); } + protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion()); } + System.out.println(ver + " (" + protocol + ")"); } else @@ -186,7 +202,6 @@ public class Main } } - protected void startup() throws InitException, ConfigurationException, Exception { final String QpidHome = System.getProperty("QPID_HOME"); @@ -201,7 +216,7 @@ public class Main error = error + "\nNote: Qpid_HOME is not set."; } - throw new InitException(error); + throw new InitException(error, null); } else { @@ -226,8 +241,8 @@ public class Main _logger.info("Starting Qpid.AMQP broker"); - ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). - getConfiguredObject(ConnectorConfiguration.class); + ConnectorConfiguration connectorConfig = + ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class); ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); @@ -249,7 +264,7 @@ public class Main } catch (NumberFormatException e) { - throw new InitException("Invalid port: " + portStr); + throw new InitException("Invalid port: " + portStr, e); } } @@ -264,19 +279,21 @@ public class Main int totalVHosts = ((Collection) virtualHosts).size(); for (int vhost = 0; vhost < totalVHosts; vhost++) { - setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost)); + setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost)); } } else { - setupVirtualHosts(configFile.getParent() , (String)virtualHosts); + setupVirtualHosts(configFile.getParent(), (String) virtualHosts); } } + bind(port, connectorConfig); } - protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException + protected void setupVirtualHosts(String configFileParent, String configFilePath) + throws ConfigurationException, AMQException, URLSyntaxException { String configVar = "${conf}"; @@ -285,7 +302,7 @@ public class Main configFilePath = configFileParent + configFilePath.substring(configVar.length()); } - if (configFilePath.indexOf(".xml") != -1 ) + if (configFilePath.indexOf(".xml") != -1) { VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath); vHostConfig.performBindings(); @@ -298,11 +315,12 @@ public class Main String[] fileNames = virtualHostDir.list(); - for (int each=0; each < fileNames.length; each++) + for (int each = 0; each < fileNames.length; each++) { if (fileNames[each].endsWith(".xml")) { - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]); + VirtualHostConfiguration vHostConfig = + new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]); vHostConfig.performBindings(); } } @@ -319,7 +337,7 @@ public class Main try { - //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); + // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); IoAcceptor acceptor = connectorConfig.createAcceptor(); SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); @@ -334,7 +352,7 @@ public class Main { sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } - + if (!connectorConfig.enableSSL || !connectorConfig.sslOnly) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); @@ -347,6 +365,7 @@ public class Main { bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); } + acceptor.bind(bindAddress, handler, sconfig); _logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } @@ -356,8 +375,7 @@ public class Main AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); try { - acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), - handler, sconfig); + acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); } catch (IOException e) @@ -415,16 +433,17 @@ public class Main } catch (NumberFormatException e) { - System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + - "a non-negative integer. Using default of zero (no watching configured"); + System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + + "a non-negative integer. Using default of zero (no watching configured"); } + if (logConfigFile.exists() && logConfigFile.canRead()) { System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath()); if (logWatchTime > 0) { - System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + - logWatchTime + " seconds"); + System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + + logWatchTime + " seconds"); // log4j expects the watch interval in milliseconds DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 4d66e37628..de3905268e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -196,6 +196,7 @@ public class DestNameExchange extends AbstractExchange } else { + _logger.error("MESSAGE LOSS: Message should be sent on a Dead Letter Queue"); _logger.warn(msg); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 14687c40ae..9052b2e81f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -98,7 +98,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR // If we haven't requested message to be resent to this consumer then reject it from ever getting it. // if (!evt.getMethod().resend) { - message.message.reject(message.message.getDeliveredSubscription()); + message.message.reject(message.message.getDeliveredSubscription(message.queue)); } if (evt.getMethod().requeue) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 2ecb39254f..30a40c5a75 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.AccessRights; import org.apache.log4j.Logger; public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody> @@ -75,23 +76,26 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con if (virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName); + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'"); } else { session.setVirtualHost(virtualHost); - AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID()); + AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID(), AccessRights.Rights.ANY); switch (result.getStatus()) { default: case REFUSED: - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Access denied to vHost '" + virtualHostName + "' by " - + result.getAuthorizer()); + String error = "Any access denied to vHost '" + virtualHostName + "' by " + + result.getAuthorizer(); + + _logger.warn(error); + + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error); case GRANTED: - _logger.info("Granted access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID() + _logger.info("Granted any access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID() + " by '" + result.getAuthorizer() + "'"); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 6029a023e5..fef00942a0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -106,7 +107,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax HeartbeatConfig.getInstance().getDelay()); // heartbeat session.writeFrame(tune); - session.setAuthorizedID(ss.getAuthorizationID()); + session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID())); disposeSaslServer(session); break; case CONTINUE: diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 6c14aae7ed..4734143497 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -95,7 +96,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< throw new AMQException("Authentication failed"); case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); - session.setAuthorizedID(ss.getAuthorizationID()); + session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID())); stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 9e0a1019f2..2e697d4564 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -64,7 +64,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar private final AtomicInteger _counter = new AtomicInteger(); - protected QueueDeclareHandler() { Configurator.configure(this); @@ -92,12 +91,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar synchronized (queueRegistry) { - if (((queue = queueRegistry.getQueue(body.queue)) == null) ) + if (((queue = queueRegistry.getQueue(body.queue)) == null)) { - if(body.passive) + if (body.passive) { - String msg = "Queue: " + body.queue + " not found."; - throw body.getChannelException(AMQConstant.NOT_FOUND,msg ); + String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ")."; + throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else { @@ -112,13 +111,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); queue.bind(body.queue, null, defaultExchange); - _log.info("Queue " + body.queue + " bound to default exchange"); + _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")"); } } } - else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) + else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner())) { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "')," + + " as exclusive queue with same name " + + "declared on another client ID('" + + queue.getOwner() + "')"); } AMQChannel channel = session.getChannel(evt.getChannelId()); @@ -138,10 +140,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - queue.getConsumerCount(), // consumerCount - queue.getMessageCount(), // messageCount - body.queue); // queue + (byte) 8, (byte) 0, // AMQP version (major, minor) + queue.getConsumerCount(), // consumerCount + queue.getMessageCount(), // messageCount + body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); session.writeFrame(response); } @@ -162,24 +164,22 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar { final QueueRegistry registry = virtualHost.getQueueRegistry(); AMQShortString owner = body.exclusive ? session.getContextKey() : null; - final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost); + final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost); final AMQShortString queueName = queue.getName(); - if(body.exclusive && !body.durable) + if (body.exclusive && !body.durable) { final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - - public void doTask(AMQProtocolSession session) throws AMQException + new AMQProtocolSession.Task() { - if(registry.getQueue(queueName) == queue) + public void doTask(AMQProtocolSession session) throws AMQException { - queue.delete(); + if (registry.getQueue(queueName) == queue) + { + queue.delete(); + } } - - } - }; + }; session.addSessionCloseTask(deleteQueueTask); @@ -190,16 +190,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar session.removeSessionCloseTask(deleteQueueTask); } }); - - - } + }// if exclusive and not durable Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue); if (virtualHostDefaultQueueConfiguration != null) { Configurator.configure(queue, virtualHostDefaultQueueConfiguration); } - + return queue; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java index c89529f2a3..38c9e4950a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,29 +20,174 @@ */ package org.apache.qpid.server.management; +import java.io.IOException; import java.lang.management.ManagementFactory; +import java.rmi.RemoteException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import java.util.HashMap; +import java.util.Map; import javax.management.JMException; import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.MBeanServerForwarder; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AccountNotFoundException; +import javax.security.sasl.AuthorizeCallback; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase; +import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser; + +/** + * This class starts up an MBeanserver. If out of the box agent is being used then there are no security features + * implemented. To use the security features like user authentication, turn off the jmx options in the "QPID_OPTS" env + * variable and use JMXMP connector server. If JMXMP connector is not available, then the standard JMXConnector will be + * used, which again doesn't have user authentication. + */ public class JMXManagedObjectRegistry implements ManagedObjectRegistry { private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class); private final MBeanServer _mbeanServer; + private Registry _rmiRegistry; + private JMXServiceURL _jmxURL; - public JMXManagedObjectRegistry() + public JMXManagedObjectRegistry() throws AMQException { _log.info("Initialising managed object registry using platform MBean server"); - // we use the platform MBean server currently but this must be changed or at least be configuurable - _mbeanServer = ManagementFactory.getPlatformMBeanServer(); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + + // Retrieve the config parameters + boolean platformServer = appRegistry.getConfiguration().getBoolean("management.platform-mbeanserver", true); + + _mbeanServer = + platformServer ? ManagementFactory.getPlatformMBeanServer() + : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN); + } + + + public void start() + { + // Check if the "QPID_OPTS" is set to use Out of the Box JMXAgent + if (areOutOfTheBoxJMXOptionsSet()) + { + _log.info("JMX: Using the out of the box JMX Agent"); + return; + } + + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + + boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", true); + int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999); + + try + { + if (security) + { + // For SASL using JMXMP + _jmxURL = new JMXServiceURL("jmxmp", null, port); + + Map env = new HashMap(); + Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases(); + PrincipalDatabase db = null; + + for (Map.Entry<String, PrincipalDatabase> entry : map.entrySet()) + { + if (entry.getValue() instanceof Base64MD5PasswordFilePrincipalDatabase) + { + db = entry.getValue(); + break; + } + else if (entry.getValue() instanceof PlainPasswordFilePrincipalDatabase) + { + db = entry.getValue(); + } + } + + if (db instanceof Base64MD5PasswordFilePrincipalDatabase) + { + env.put("jmx.remote.profiles", "SASL/CRAM-MD5"); + CRAMMD5HashedInitialiser initialiser = new CRAMMD5HashedInitialiser(); + initialiser.initialise(db); + env.put("jmx.remote.sasl.callback.handler", initialiser.getCallbackHandler()); + } + else if (db instanceof PlainPasswordFilePrincipalDatabase) + { + env.put("jmx.remote.profiles", "SASL/PLAIN"); + env.put("jmx.remote.sasl.callback.handler", new UserCallbackHandler(db)); + } + + // Enable the SSL security and server authentication + /* + SslRMIClientSocketFactory csf = new SslRMIClientSocketFactory(); + SslRMIServerSocketFactory ssf = new SslRMIServerSocketFactory(); + env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf); + env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssf); + */ + + try + { + JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, env, _mbeanServer); + MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance(); + cs.setMBeanServerForwarder(mbsf); + cs.start(); + _log.info("JMX: Starting JMXConnector server with SASL"); + } + catch (java.net.MalformedURLException urlException) + { + // When JMXMPConnector is not available + // java.net.MalformedURLException: Unsupported protocol: jmxmp + _log.info("JMX: Starting JMXConnector server"); + startJMXConnectorServer(port); + } + } + else + { + startJMXConnectorServer(port); + } + } + catch (Exception ex) + { + _log.error("Error in initialising Managed Object Registry." + ex.getMessage()); + ex.printStackTrace(); + } + } + + /** + * Starts up an RMIRegistry at configured port and attaches a JMXConnectorServer to it. + * + * @param port + * + * @throws IOException + */ + private void startJMXConnectorServer(int port) throws IOException + { + startRMIRegistry(port); + _jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + port + "/jmxrmi"); + JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, null, _mbeanServer); + cs.start(); } public void registerObject(ManagedObject managedObject) throws JMException { - _mbeanServer.registerMBean(managedObject, managedObject.getObjectName()); + _mbeanServer.registerMBean(managedObject, managedObject.getObjectName()); } public void unregisterObject(ManagedObject managedObject) throws JMException @@ -50,4 +195,105 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry _mbeanServer.unregisterMBean(managedObject.getObjectName()); } + /** + * Checks is the "QPID_OPTS" env variable is set to use the out of the box JMXAgent. + * + * @return + */ + private boolean areOutOfTheBoxJMXOptionsSet() + { + if (System.getProperty("com.sun.management.jmxremote") != null) + { + return true; + } + + if (System.getProperty("com.sun.management.jmxremote.port") != null) + { + return true; + } + + return false; + } + + /** + * Starts the rmi registry at given port + * + * @param port + * + * @throws RemoteException + */ + private void startRMIRegistry(int port) throws RemoteException + { + System.setProperty("java.rmi.server.randomIDs", "true"); + _rmiRegistry = LocateRegistry.createRegistry(port); + } + + // stops the RMIRegistry, if it was running and bound to a port + public void close() throws RemoteException + { + if (_rmiRegistry != null) + { + // Stopping the RMI registry + UnicastRemoteObject.unexportObject(_rmiRegistry, true); + } + } + + /** This class is used for SASL enabled JMXConnector for performing user authentication. */ + private class UserCallbackHandler implements CallbackHandler + { + private final PrincipalDatabase _principalDatabase; + + protected UserCallbackHandler(PrincipalDatabase database) + { + _principalDatabase = database; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + // Retrieve callbacks + NameCallback ncb = null; + PasswordCallback pcb = null; + for (int i = 0; i < callbacks.length; i++) + { + if (callbacks[i] instanceof NameCallback) + { + ncb = (NameCallback) callbacks[i]; + } + else if (callbacks[i] instanceof PasswordCallback) + { + pcb = (PasswordCallback) callbacks[i]; + } + else if (callbacks[i] instanceof AuthorizeCallback) + { + ((AuthorizeCallback) callbacks[i]).setAuthorized(true); + } + else + { + throw new UnsupportedCallbackException(callbacks[i]); + } + } + + boolean authorized = false; + // Process retrieval of password; can get password if username is available in NameCallback + if ((ncb != null) && (pcb != null)) + { + String username = ncb.getDefaultName(); + try + { + authorized = _principalDatabase.verifyPassword(username, new String(pcb.getPassword())); + } + catch (AccountNotFoundException e) + { + IOException ioe = new IOException("User not authorized. " + e); + ioe.initCause(e); + throw ioe; + } + } + + if (!authorized) + { + throw new IOException("User not authorized."); + } + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java new file mode 100644 index 0000000000..a79d993afc --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java @@ -0,0 +1,217 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.management; + +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import javax.management.remote.MBeanServerForwarder; +import javax.management.remote.JMXPrincipal; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.MBeanInfo; +import javax.management.MBeanOperationInfo; +import javax.management.JMException; +import javax.security.auth.Subject; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.lang.reflect.Method; +import java.security.AccessController; +import java.security.Principal; +import java.security.AccessControlContext; +import java.util.Set; +import java.util.Properties; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; +import java.io.File; +import java.io.InputStream; +import java.io.IOException; +import java.io.FileInputStream; + +/** + * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. This implements + * the logic for allowing the users to invoke MBean operations and implements the restrictions for readOnly, readWrite + * and admin users. + */ +public class MBeanInvocationHandlerImpl implements InvocationHandler +{ + private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class); + + public final static String ADMIN = "admin"; + public final static String READWRITE = "readwrite"; + public final static String READONLY = "readonly"; + private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate"; + private MBeanServer mbs; + private static Properties _userRoles = new Properties(); + + public static MBeanServerForwarder newProxyInstance() + { + final InvocationHandler handler = new MBeanInvocationHandlerImpl(); + final Class[] interfaces = new Class[]{MBeanServerForwarder.class}; + + Object proxy = Proxy.newProxyInstance(MBeanServerForwarder.class.getClassLoader(), interfaces, handler); + return MBeanServerForwarder.class.cast(proxy); + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + final String methodName = method.getName(); + + if (methodName.equals("getMBeanServer")) + { + return mbs; + } + + if (methodName.equals("setMBeanServer")) + { + if (args[0] == null) + { + throw new IllegalArgumentException("Null MBeanServer"); + } + if (mbs != null) + { + throw new IllegalArgumentException("MBeanServer object already initialized"); + } + mbs = (MBeanServer) args[0]; + return null; + } + + // Retrieve Subject from current AccessControlContext + AccessControlContext acc = AccessController.getContext(); + Subject subject = Subject.getSubject(acc); + + // Allow operations performed locally on behalf of the connector server itself + if (subject == null) + { + return method.invoke(mbs, args); + } + + if (args == null || DELEGATE.equals(args[0])) + { + return method.invoke(mbs, args); + } + + // Restrict access to "createMBean" and "unregisterMBean" to any user + if (methodName.equals("createMBean") || methodName.equals("unregisterMBean")) + { + throw new SecurityException("Access denied"); + } + + // Retrieve JMXPrincipal from Subject + Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class); + if (principals == null || principals.isEmpty()) + { + throw new SecurityException("Access denied"); + } + + Principal principal = principals.iterator().next(); + String identity = principal.getName(); + + // Following users can perform any operation other than "createMBean" and "unregisterMBean" + if (isAdmin(identity) || isAllowedToModify(identity)) + { + return method.invoke(mbs, args); + } + + // These users can only call "getAttribute" on the MBeanServerDelegate MBean + // Here we can add other fine grained permissions like specific method for a particular mbean + if (isReadOnlyUser(identity) && isReadOnlyMethod(method, args)) + { + return method.invoke(mbs, args); + } + + throw new SecurityException("Access denied"); + } + + // Initialises the user roles + public static void setAccessRights(Properties accessRights) + { + _userRoles = accessRights; + } + + private boolean isAdmin(String userName) + { + if (ADMIN.equals(_userRoles.getProperty(userName))) + { + return true; + } + return false; + } + + private boolean isAllowedToModify(String userName) + { + if (READWRITE.equals(_userRoles.getProperty(userName))) + { + return true; + } + return false; + } + + private boolean isReadOnlyUser(String userName) + { + if (READONLY.equals(_userRoles.getProperty(userName))) + { + return true; + } + return false; + } + + private boolean isReadOnlyMethod(Method method, Object[] args) + { + String methodName = method.getName(); + if (methodName.equals("queryMBeans") || + methodName.equals("getDefaultDomain") || + methodName.equals("getMBeanInfo") || + methodName.equals("getAttribute") || + methodName.equals("getAttributes")) + { + return true; + } + + if (args[0] instanceof ObjectName) + { + String mbeanMethod = (args.length > 1) ? (String) args[1] : null; + if (mbeanMethod == null) + { + return false; + } + + try + { + MBeanInfo mbeanInfo = mbs.getMBeanInfo((ObjectName) args[0]); + if (mbeanInfo != null) + { + MBeanOperationInfo[] opInfos = mbeanInfo.getOperations(); + for (MBeanOperationInfo opInfo : opInfos) + { + if (opInfo.getName().equals(mbeanMethod) && (opInfo.getImpact() == MBeanOperationInfo.INFO)) + { + return true; + } + } + } + } + catch (JMException ex) + { + ex.printStackTrace(); + } + } + + return false; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java index b2f79b6410..45e2e91ed7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java @@ -52,8 +52,7 @@ public interface ManagedBroker @MBeanOperation(name="createNewExchange", description="Creates a new Exchange", impact= MBeanOperationInfo.ACTION) void createNewExchange(@MBeanOperationParameter(name="name", description="Name of the new exchange")String name, @MBeanOperationParameter(name="ExchangeType", description="Type of the exchange")String type, - @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable, - @MBeanOperationParameter(name="passive", description="true of the Exchange should be passive")boolean passive) + @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable) throws IOException, JMException; /** @@ -81,8 +80,7 @@ public interface ManagedBroker @MBeanOperation(name="createNewQueue", description="Create a new Queue on the Broker server", impact= MBeanOperationInfo.ACTION) void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName, @MBeanOperationParameter(name="owner", description="Owner name")String owner, - @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable, - @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete) + @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable) throws IOException, JMException; /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java index 32298f05e3..5f9bc9ddad 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.management; import javax.management.JMException; +import java.rmi.RemoteException; /** * Handles the registration (and unregistration and so on) of managed objects. @@ -36,7 +37,11 @@ import javax.management.JMException; */ public interface ManagedObjectRegistry { + void start(); + void registerObject(ManagedObject managedObject) throws JMException; void unregisterObject(ManagedObject managedObject) throws JMException; + + void close() throws RemoteException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java index 5b86543ea6..b4fbed6948 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java @@ -24,6 +24,8 @@ import javax.management.JMException; import org.apache.log4j.Logger; +import java.rmi.RemoteException; + /** * This managed object registry does not actually register MBeans. This can be used in tests when management is * not required or when management has been disabled. @@ -38,6 +40,11 @@ public class NoopManagedObjectRegistry implements ManagedObjectRegistry _log.info("Management is disabled"); } + public void start() + { + //no-op + } + public void registerObject(ManagedObject managedObject) throws JMException { } @@ -45,4 +52,9 @@ public class NoopManagedObjectRegistry implements ManagedObjectRegistry public void unregisterObject(ManagedObject managedObject) throws JMException { } + + public void close() throws RemoteException + { + + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index fd8fb2d5cb..2e62c2f1e4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.security.Principal; import javax.management.JMException; import javax.security.sasl.SaslServer; @@ -108,7 +109,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion); private List<Integer> _closingChannelsList = new ArrayList<Integer>(); private ProtocolOutputConverter _protocolOutputConverter; - private String _authorizedID; + private Principal _authorizedID; public ManagedObject getManagedObject() @@ -745,12 +746,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _protocolOutputConverter; } - public void setAuthorizedID(String authorizedID) + public void setAuthorizedID(Principal authorizedID) { _authorizedID = authorizedID; } - public String getAuthorizedID() + public Principal getAuthorizedID() { return _authorizedID; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 79421dd497..390117acf6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -31,6 +31,8 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.security.Principal; + public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { @@ -165,9 +167,9 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public ProtocolOutputConverter getProtocolOutputConverter(); - void setAuthorizedID(String authorizedID); + void setAuthorizedID(Principal authorizedID); - /** @return a username string that was used to authorized this session */ - String getAuthorizedID(); + /** @return a Principal that was used to authorized this session */ + Principal getAuthorizedID(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 5eebd4c524..66f928a70e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,14 +37,15 @@ */ package org.apache.qpid.server.protocol; +import java.security.Principal; import java.util.Date; import java.util.List; import javax.management.JMException; import javax.management.MBeanException; import javax.management.MBeanNotificationInfo; -import javax.management.Notification; import javax.management.NotCompliantMBeanException; +import javax.management.Notification; import javax.management.monitor.MonitorNotification; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -56,15 +77,17 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { private AMQMinaProtocolSession _session = null; private String _name = null; - - //openmbean data types for representing the channel attributes - private final static String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"}; - private final static String[] _indexNames = {_channelAtttibuteNames[0]}; - private final static OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; - private static CompositeType _channelType = null; // represents the data type for channel data - private static TabularType _channelsType = null; // Data type for list of channels type + + // openmbean data types for representing the channel attributes + private static final String[] _channelAtttibuteNames = + { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count" }; + private static final String[] _indexNames = { _channelAtttibuteNames[0] }; + private static final OpenType[] _channelAttributeTypes = + { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER }; + private static CompositeType _channelType = null; // represents the data type for channel data + private static TabularType _channelsType = null; // Data type for list of channels type private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION = - new AMQShortString("Broker Management Console has closed the connection."); + new AMQShortString("Broker Management Console has closed the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException @@ -72,22 +95,21 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed super(ManagedConnection.class, ManagedConnection.TYPE); _session = session; String remote = getRemoteAddress(); - remote = "anonymous".equals(remote) ? remote + hashCode() : remote; + remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; _name = jmxEncode(new StringBuffer(remote), 0).toString(); init(); } - static { try { init(); } - catch(JMException ex) + catch (JMException ex) { - // It should never occur - System.out.println(ex.getMessage()); + // This is not expected to ever occur. + throw new RuntimeException("Got JMException in static initializer.", ex); } } @@ -96,26 +118,27 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed */ private static void init() throws OpenDataException { - _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, - _channelAtttibuteNames, _channelAttributeTypes); + _channelType = + new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, _channelAtttibuteNames, + _channelAttributeTypes); _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames); } public String getClientId() { - return _session.getContextKey() == null ? null : _session.getContextKey().toString(); + return (_session.getContextKey() == null) ? null : _session.getContextKey().toString(); } public String getAuthorizedId() { - return _session.getAuthorizedID(); + return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null; } public String getVersion() { - return _session.getClientVersion() == null ? null : _session.getClientVersion().toString(); + return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString(); } - + public Date getLastIoTime() { return new Date(_session.getIOSession().getLastIoTime()); @@ -171,6 +194,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } + _session.commitTransactions(channel); } catch (AMQException ex) @@ -194,6 +218,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } + _session.rollbackTransactions(channel); } catch (AMQException ex) @@ -215,9 +240,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed for (AMQChannel channel : list) { - Object[] itemValues = {channel.getChannelId(), channel.isTransactional(), + Object[] itemValues = + { + channel.getChannelId(), channel.isTransactional(), (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null, - channel.getUnacknowledgedMessageMap().size()}; + channel.getUnacknowledgedMessageMap().size() + }; CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues); channelsList.put(channelData); @@ -232,17 +260,16 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed * @throws JMException */ public void closeConnection() throws JMException - { + { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, - _session.getProtocolMajorVersion(), - _session.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText + final AMQFrame response = + ConnectionCloseBody.createAMQFrame(0, _session.getProtocolMajorVersion(), _session.getProtocolMinorVersion(), // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText ); _session.writeFrame(response); @@ -259,18 +286,19 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed @Override public MBeanNotificationInfo[] getNotificationInfo() { - String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; String name = MonitorNotification.class.getName(); String description = "Channel count has reached threshold value"; MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - return new MBeanNotificationInfo[]{info1}; + return new MBeanNotificationInfo[] { info1 }; } public void notifyClients(String notificationMsg) { - Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + Notification n = + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java index 990c4c0794..e6e713ac6d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol; import java.io.IOException; import java.util.Date; +import java.security.Principal; import javax.management.JMException; import javax.management.MBeanOperationInfo; @@ -67,16 +68,17 @@ public interface ManagedConnection /** * Tells the total number of bytes written till now. * @return number of bytes written. - */ + * @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now") Long getWrittenBytes(); - + */ /** * Tells the total number of bytes read till now. * @return number of bytes read. - */ + * @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now") Long getReadBytes(); + */ /** * Threshold high value for no of channels. This is useful in setting notifications or diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d6962d28cd..b2046efee3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -42,6 +43,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -78,19 +81,20 @@ public class AMQMessage private boolean _immediate; private AtomicBoolean _taken = new AtomicBoolean(false); - private TransientMessageData _transientMessageData = new TransientMessageData(); private Subscription _takenBySubcription; - private Set<Subscription> _rejectedBy = null; + private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); + private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - public boolean isTaken() + public boolean isTaken(AMQQueue queue) { return _taken.get(); } private final int hashcode = System.identityHashCode(this); + public String debugIdentity() { return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; @@ -203,9 +207,10 @@ public class AMQMessage _transientMessageData.setMessagePublishInfo(info); _taken = new AtomicBoolean(false); + if (_log.isDebugEnabled()) { - _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")"); + _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")"); } } @@ -318,8 +323,10 @@ public class AMQMessage // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store + for (AMQQueue q : _transientMessageData.getDestinationQueues()) { + _takenMap.put(q, new AtomicBoolean(false)); _messageHandle.enqueue(storeContext, _messageId, q); } @@ -356,12 +363,13 @@ public class AMQMessage } /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation. + * Creates a long-lived reference to this message, and increments the count of such references, as an atomic + * operation. */ public AMQMessage takeReference() { _referenceCount.incrementAndGet(); - return this; + return this; } /** Threadsafe. Increment the reference count on the message. */ @@ -378,9 +386,10 @@ public class AMQMessage * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. * + * @param storeContext + * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed - * @param storeContext */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { @@ -451,7 +460,7 @@ public class AMQMessage } - public boolean taken(Subscription sub) + public boolean taken(AMQQueue queue, Subscription sub) { if (_taken.getAndSet(true)) { @@ -464,7 +473,7 @@ public class AMQMessage } } - public void release() + public void release(AMQQueue queue) { if (_log.isTraceEnabled()) { @@ -600,7 +609,7 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { //Increment the references to this message for each queue delivery. - incrementReference(); + incrementReference(); //normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } @@ -824,11 +833,14 @@ public class AMQMessage public String toString() { - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + - _taken + " by:" + _takenBySubcription; + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + + _taken + " by :" + _takenBySubcription; + +// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + +// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } - public Subscription getDeliveredSubscription() + public Subscription getDeliveredSubscription(AMQQueue queue) { return _takenBySubcription; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 7a32848c44..bbaa7379f6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -1,5 +1,25 @@ /* * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/* + * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,11 +37,11 @@ */ package org.apache.qpid.server.queue; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.Date; -import java.text.SimpleDateFormat; import javax.management.JMException; import javax.management.MBeanException; @@ -41,12 +61,14 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import org.apache.log4j.Logger; + import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -73,15 +95,15 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method - private final static String[] _msgAttributeNames = {"AMQ MessageId", "Header", "Size(bytes)", "Redelivered"}; - private static String[] _msgAttributeIndex = {_msgAttributeNames[0]}; + private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" }; + private static String[] _msgAttributeIndex = { _msgAttributeNames[0] }; private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types. - private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. - private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. + private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. + private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. // OpenMBean data types for viewMessageContent method private static CompositeType _msgContentType = null; - private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"}; + private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" }; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; @@ -95,7 +117,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString(); } - public ManagedObject getParentObject() { return _queue.getVirtualHost().getManagedObject(); @@ -107,10 +128,10 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { init(); } - catch(JMException ex) + catch (JMException ex) { - // It should never occur - System.out.println(ex.getMessage()); + // This is not expected to ever occur. + throw new RuntimeException("Got JMException in static initializer.", ex); } } @@ -119,19 +140,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ private static void init() throws OpenDataException { - _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id - _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType - _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding - _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content - _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); - - _msgAttributeTypes[0] = SimpleType.LONG; // For message id - _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes - _msgAttributeTypes[2] = SimpleType.LONG; // For size - _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - - _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); + _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id + _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType + _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding + _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content + _msgContentType = + new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes, + _msgContentAttributeTypes); + + _msgAttributeTypes[0] = SimpleType.LONG; // For message id + _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes + _msgAttributeTypes[2] = SimpleType.LONG; // For size + _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered + + _messageDataType = + new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); } @@ -213,7 +236,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public Long getMaximumQueueDepth() { long queueDepthInBytes = _queue.getMaximumQueueDepth(); - return queueDepthInBytes >> 10 ; + + return queueDepthInBytes >> 10; } public void setMaximumQueueDepth(Long value) @@ -227,7 +251,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public Long getQueueDepth() throws JMException { long queueBytesSize = _queue.getQueueDepth(); - return queueBytesSize >> 10 ; + + return queueBytesSize >> 10; } /** @@ -237,13 +262,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); - for(NotificationCheck check : NotificationCheck.values()) + for (NotificationCheck check : NotificationCheck.values()) { - if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime) + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - if(check.notifyIfNecessary(msg, _queue, this)) + if (check.notifyIfNecessary(msg, _queue, this)) { _lastNotificationTimes[check.ordinal()] = currentTime; } @@ -260,9 +285,10 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // important : add log to the log file - monitoring tools may be looking for this _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); notificationMsg = notification.name() + " " + notificationMsg; - - _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + + _lastNotification = + new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, + System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(_lastNotification); } @@ -334,20 +360,25 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que try { // Create header attributes list - CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + CommonContentHeaderProperties headerProperties = + (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; String mimeType = null, encoding = null; if (headerProperties != null) { AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString(); - encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString(); + mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); + encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); } - Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; + + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues); } catch (AMQException e) { - throw new JMException("Error creating header attributes list: " + e); + JMException jme = new JMException("Error creating header attributes list: " + e); + jme.initCause(e); + throw jme; } } @@ -358,8 +389,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { if ((beginIndex > endIndex) || (beginIndex < 1)) { - throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex + - "\n\"From Index\" should be greater than 0 and less than \"To Index\""); + throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex + + "\n\"From Index\" should be greater than 0 and less than \"To Index\""); } List<AMQMessage> list = _queue.getMessagesOnTheQueue(); @@ -368,20 +399,22 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que try { // Create the tabular list of message header contents - for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) + for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { AMQMessage msg = list.get(i - 1); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() }; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); } } catch (AMQException e) { - throw new JMException("Error creating message contents: " + e); + JMException jme = new JMException("Error creating message contents: " + e); + jme.initCause(e); + throw jme; } return _messageList; @@ -400,11 +433,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); int delMode = headerProperties.getDeliveryMode(); - list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent")); + list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent")); list.add("JMSPriority = " + headerProperties.getPriority()); list.add("JMSType = " + headerProperties.getType()); - + long longDate = headerProperties.getExpiration(); String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; list.add("JMSExpiration = " + strDate); @@ -425,27 +458,26 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que */ public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException { - if (fromMessageId > toMessageId || (fromMessageId < 1)) + if ((fromMessageId > toMessageId) || (fromMessageId < 1)) { - throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); + throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\""); } _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext); } - /** * returns Notifications sent by this MBean. */ @Override public MBeanNotificationInfo[] getNotificationInfo() { - String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED }; String name = MonitorNotification.class.getName(); String description = "Either Message count or Queue depth or Message size has reached threshold high value"; MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); - return new MBeanNotificationInfo[]{info1}; + return new MBeanNotificationInfo[] { info1 }; } } // End of AMQQueueMBean class diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index cfa13c87fd..979f692361 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -210,6 +210,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * Returns all the messages in the Queue + * * @return List of messages */ public List<AMQMessage> getMessages() @@ -222,14 +223,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager list.add(message); } _lock.unlock(); - + return list; } /** * Returns messages within the range of given messageIds + * * @param fromMessageId * @param toMessageId + * * @return */ public List<AMQMessage> getMessages(long fromMessageId, long toMessageId) @@ -242,7 +245,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long maxMessageCount = toMessageId - fromMessageId + 1; _lock.lock(); - + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); for (AMQMessage message : _messages) @@ -399,7 +402,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeAMessageFromTop(StoreContext storeContext) throws AMQException { _lock.lock(); - + AMQMessage message = _messages.poll(); if (message != null) { @@ -432,9 +435,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return count; } - /** - This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. - */ + /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */ private AMQMessage getNextMessage() throws AMQException { return getNextMessage(_messages, null); @@ -444,8 +445,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.) - while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub)) + //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) + while (message != null + && ( + ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) + || sub == null) + && message.taken(_queue, sub)) { //remove the already taken message AMQMessage removed = messages.poll(); @@ -506,7 +511,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) + + _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } @@ -526,7 +531,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() + + _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } @@ -562,7 +567,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } catch (AMQException e) { - message.release(); + message.release(_queue); _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e); } } @@ -723,7 +728,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } - msg.taken(s); + msg.taken(_queue, s); //Deliver the message s.send(msg, _queue); } @@ -737,7 +742,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - if (!msg.isTaken()) + if (!msg.isTaken(_queue)) { if (_log.isInfoEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index d3578d39e8..e3944954f3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -558,7 +558,7 @@ public class SubscriptionImpl implements Subscription _logger.trace("Removed for resending:" + resent.debugIdentity()); } - resent.release(); + resent.release(_queue); _queue.subscriberHasPendingResend(false, this, resent); try diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 14a8063aee..89f0b7b39d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -153,7 +153,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _logger.error("Error configuring application: " + e, e); //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); - throw new RuntimeException("Unable to create Application Registry"); + throw new RuntimeException("Unable to create Application Registry", e); } } else @@ -168,6 +168,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { virtualHost.close(); } + + // close the rmi registry(if any) started for management + if (getInstance().getManagedObjectRegistry() != null) + { + getInstance().getManagedObjectRegistry().close(); + } } public Configuration getConfiguration() @@ -187,7 +193,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry catch (Exception e) { _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); } Configurator.configure(instance); _configuredObjects.put(instanceType, instance); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 739ed9db42..1cca259a8d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.security.access.AccessManager; import org.apache.qpid.server.security.access.AccessManagerImpl; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.AMQException; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { @@ -103,6 +104,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { initialiseManagedObjectRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(); _accessManager = new AccessManagerImpl("default", _configuration); @@ -111,7 +113,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + _databaseManager.initialiseManagement(_configuration); + + _managedObjectRegistry.start(); + initialiseVirtualHosts(); + } private void initialiseVirtualHosts() throws Exception @@ -123,7 +130,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } } - private void initialiseManagedObjectRegistry() + private void initialiseManagedObjectRegistry() throws AMQException { ManagementConfiguration config = getConfiguredObject(ManagementConfiguration.class); if (config.enabled) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java new file mode 100644 index 0000000000..f9e093dba7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security; + +import org.apache.commons.codec.binary.Base64; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.DigestException; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintStream; + +public class Passwd +{ + public static void main(String args[]) throws NoSuchAlgorithmException, DigestException, IOException + { + if (args.length != 2) + { + System.out.println("Passwd <username> <password>"); + System.exit(0); + } + + byte[] data = args[1].getBytes("utf-8"); + + MessageDigest md = MessageDigest.getInstance("MD5"); + + for (byte b : data) + { + md.update(b); + } + + byte[] digest = md.digest(); + + Base64 b64 = new Base64(); + + byte[] encoded = b64.encode(digest); + + output(args[0], encoded); + } + + private static void output(String user, byte[] encoded) throws IOException + { + +// File passwdFile = new File("qpid.passwd"); + + PrintStream ps = new PrintStream(System.out); + + user += ":"; + ps.write(user.getBytes("utf-8")); + + for (byte b : encoded) + { + ps.write(b); + } + + ps.println(); + + ps.flush(); + ps.close(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java new file mode 100644 index 0000000000..a43474559d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanOperationParameter; +import org.apache.qpid.server.management.MBeanOperation; +import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.log4j.Logger; +import org.apache.commons.configuration.ConfigurationException; + +import javax.management.JMException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.security.auth.login.AccountNotFoundException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.FileOutputStream; +import java.util.Properties; +import java.util.List; +import java.util.Enumeration; +import java.util.concurrent.locks.ReentrantLock; +import java.security.Principal; + +/** MBean class for AMQUserManagementMBean. It implements all the management features exposed for managing users. */ +@MBeanDescription("User Management Interface") +public class AMQUserManagementMBean extends AMQManagedObject implements UserManagement +{ + + private static final Logger _logger = Logger.getLogger(AMQUserManagementMBean.class); + + private PrincipalDatabase _principalDatabase; + private String _accessFileName; + private Properties _accessRights; + // private File _accessFile; + private ReentrantLock _accessRightsUpdate = new ReentrantLock(); + + // Setup for the TabularType + static TabularType _userlistDataType; // Datatype for representing User Lists + + static CompositeType _userDataType; // Composite type for representing User + static String[] _userItemNames = {"Username", "Read", "Write", "Admin"}; + + static + { + String[] userItemDesc = {"Broker Login username", "Management Console Read Permission", + "Management Console Write Permission", "Management Console Admin Permission"}; + + OpenType[] userItemTypes = new OpenType[4]; // User item types. + userItemTypes[0] = SimpleType.STRING; // For Username + userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read + userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write + userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin + String[] userDataIndex = {_userItemNames[0]}; + + try + { + _userDataType = + new CompositeType("User", "User Data", _userItemNames, userItemDesc, userItemTypes); + + _userlistDataType = new TabularType("Users", "List of users", _userDataType, userDataIndex); + } + catch (OpenDataException e) + { + _logger.error("Tabular data setup for viewing users incorrect."); + _userlistDataType = null; + } + } + + + public AMQUserManagementMBean() throws JMException + { + super(UserManagement.class, UserManagement.TYPE); + } + + public String getObjectInstanceName() + { + return UserManagement.TYPE; + } + + public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "password", description = "Password")String password) + { + try + { + //delegate password changes to the Principal Database + return _principalDatabase.updatePassword(new UsernamePrincipal(username), password); + } + catch (AccountNotFoundException e) + { + _logger.warn("Attempt to set password of non-existant user'" + username + "'"); + return false; + } + } + + public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, + @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, + @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin) + { + + if (_accessRights.get(username) == null) + { + // If the user doesn't exist in the user rights file check that they at least have an account. + if (_principalDatabase.getUser(username) == null) + { + return false; + } + } + + try + { + + _accessRightsUpdate.lock(); + + // Update the access rights + if (admin) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.ADMIN); + } + else + { + if (read | write) + { + if (read) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.READONLY); + } + if (write) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.READWRITE); + } + } + else + { + _accessRights.remove(username); + } + } + + saveAccessFile(); + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + + return true; + } + + public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "password", description = "Password")String password, + @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, + @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, + @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin) + { + if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password)) + { + _accessRights.put(username, ""); + + return setRights(username, read, write, admin); + } + + return false; + } + + public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username) + { + + try + { + if (_principalDatabase.deletePrincipal(new UsernamePrincipal(username))) + { + try + { + _accessRightsUpdate.lock(); + + _accessRights.remove(username); + saveAccessFile(); + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + return true; + } + } + catch (AccountNotFoundException e) + { + _logger.warn("Attempt to delete user (" + username + ") that doesn't exist"); + } + + return false; + } + + public boolean reloadData() + { + try + { + try + { + loadAccessFile(); + } + catch (ConfigurationException e) + { + _logger.info("Reload failed due to:" + e); + return false; + } + + // Reload successful + return true; + } + catch (IOException e) + { + _logger.info("Reload failed due to:" + e); + // Reload unsuccessful + return false; + } + } + + + @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.") + public TabularData viewUsers() + { + // Table of users + // Username(string), Access rights Read,Write,Admin(bool,bool,bool) + + reloadData(); + + if (_userlistDataType == null) + { + _logger.warn("TabluarData not setup correctly"); + return null; + } + + List<Principal> users = _principalDatabase.getUsers(); + + TabularDataSupport userList = new TabularDataSupport(_userlistDataType); + + try + { + // Create the tabular list of message header contents + for (Principal user : users) + { + // Create header attributes list + + String rights = (String) _accessRights.get(user.getName()); + + Boolean read = false; + Boolean write = false; + Boolean admin = false; + + if (rights != null) + { + read = rights.equals(MBeanInvocationHandlerImpl.READONLY) + || rights.equals(MBeanInvocationHandlerImpl.READWRITE); + write = rights.equals(MBeanInvocationHandlerImpl.READWRITE); + admin = rights.equals(MBeanInvocationHandlerImpl.ADMIN); + } + + Object[] itemData = {user.getName(), read, write, admin}; + CompositeData messageData = new CompositeDataSupport(_userDataType, _userItemNames, itemData); + userList.put(messageData); + } + } + catch (OpenDataException e) + { + _logger.warn("Unable to create user list due to :" + e); + return null; + } + + return userList; + } + + /*** Broker Methods **/ + + /** + * setPrincipalDatabase + * + * @param database set The Database to use for user lookup + */ + public void setPrincipalDatabase(PrincipalDatabase database) + { + _principalDatabase = database; + } + + /** + * setAccessFile + * + * @param accessFile the file to use for updating. + * + * @throws java.io.IOException If the file cannot be accessed + * @throws org.apache.commons.configuration.ConfigurationException + * if checks on the file fail. + */ + public void setAccessFile(String accessFile) throws IOException, ConfigurationException + { + _accessFileName = accessFile; + + if (_accessFileName != null) + { + loadAccessFile(); + } + else + { + _logger.warn("Access rights file specified is null. Access rights not changed."); + } + } + + private void loadAccessFile() throws IOException, ConfigurationException + { + try + { + _accessRightsUpdate.lock(); + + Properties accessRights = new Properties(); + + File accessFile = new File(_accessFileName); + + if (!accessFile.exists()) + { + throw new ConfigurationException("'" + _accessFileName + "' does not exist"); + } + + if (!accessFile.canRead()) + { + throw new ConfigurationException("Cannot read '" + _accessFileName + "'."); + } + + if (!accessFile.canWrite()) + { + _logger.warn("Unable to write to access file '" + _accessFileName + "' changes will not be preserved."); + } + + accessRights.load(new FileInputStream(accessFile)); + checkAccessRights(accessRights); + setAccessRights(accessRights); + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + } + + private void checkAccessRights(Properties accessRights) + { + Enumeration values = accessRights.propertyNames(); + + while (values.hasMoreElements()) + { + String user = (String) values.nextElement(); + + if (_principalDatabase.getUser(user) == null) + { + _logger.warn("Access rights contains user '" + user + "' but there is no authentication data for that user"); + } + } + } + + private void saveAccessFile() + { + try + { + _accessRightsUpdate.lock(); + try + { + // remove old temporary file + File tmp = new File(_accessFileName + ".tmp"); + if (tmp.exists()) + { + tmp.delete(); + } + + //remove old backup + File old = new File(_accessFileName + ".old"); + if (old.exists()) + { + old.delete(); + } + + // Rename current file + File rights = new File(_accessFileName); + rights.renameTo(old); + + FileOutputStream output = new FileOutputStream(tmp); + _accessRights.store(output, ""); + output.close(); + + // Rename new file to main file + tmp.renameTo(rights); + + // delete tmp + tmp.delete(); + } + catch (IOException e) + { + _logger.warn("Problem occured saving '" + _accessFileName + "' changes may not be preserved. :" + e); + } + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + } + + /** + * user=read user=write user=readwrite user=admin + * + * @param accessRights The properties list of access rights to process + */ + private void setAccessRights(Properties accessRights) + { + _logger.debug("Setting Access Rights:" + accessRights); + _accessRights = accessRights; + MBeanInvocationHandlerImpl.setAccessRights(_accessRights); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java index 0c0de88182..d70a6dc8f4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java @@ -20,8 +20,13 @@ */ package org.apache.qpid.server.security.access; +import java.security.Principal; + public interface AccessManager { + AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights); + + @Deprecated AccessResult isAuthorized(Accessable accessObject, String username); String getName(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java index 0feb2791da..35d036d20f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java @@ -23,13 +23,13 @@ package org.apache.qpid.server.security.access; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.configuration.PropertyUtils; -import org.apache.qpid.configuration.PropertyException; import org.apache.log4j.Logger; import java.util.List; import java.lang.reflect.Method; -import java.lang.reflect.InvocationTargetException; +import java.security.Principal; public class AccessManagerImpl implements AccessManager { @@ -39,8 +39,13 @@ public class AccessManagerImpl implements AccessManager public AccessManagerImpl(String name, Configuration hostConfig) throws ConfigurationException { - String accessClass = hostConfig.getString("security.access.class"); + if (hostConfig == null) + { + _logger.warn("No Configuration specified. Using default access controls for VirtualHost:'" + name + "'"); + return; + } + String accessClass = hostConfig.getString("security.access.class"); if (accessClass == null) { _logger.warn("No access control specified. Using default access controls for VirtualHost:'" + name + "'"); @@ -111,21 +116,35 @@ public class AccessManagerImpl implements AccessManager } catch (Exception e) { - throw new ConfigurationException(e.getCause()); + ConfigurationException ce = new ConfigurationException(e.getMessage(), e.getCause()); + ce.initCause(e); + throw ce; } } } - public AccessResult isAuthorized(Accessable accessObject, String username) { + return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); + } + + public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) + { if (_accessManager == null) { - return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, username); + if (ApplicationRegistry.getInstance().getAccessManager() == this) + { + _logger.warn("No Default access manager specified DENYING ALL ACCESS"); + return new AccessResult(this, AccessResult.AccessStatus.REFUSED); + } + else + { + return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, user, rights); + } } else { - return _accessManager.isAuthorized(accessObject, username); + return _accessManager.isAuthorized(accessObject, user, rights); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java new file mode 100644 index 0000000000..1b79a5a0e0 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +public class AccessRights +{ + public enum Rights + { + ANY, + READ, + WRITE, + READWRITE + } + + Rights _right; + + public AccessRights(Rights right) + { + _right = right; + } + + public boolean allows(Rights rights) + { + switch (_right) + { + case ANY: + return (rights.equals(Rights.WRITE) + || rights.equals(Rights.READ) + || rights.equals(Rights.READWRITE) + || rights.equals(Rights.ANY)); + case READ: + return rights.equals(Rights.READ) || rights.equals(Rights.ANY); + case WRITE: + return rights.equals(Rights.WRITE) || rights.equals(Rights.ANY); + case READWRITE: + return true; + } + return false; + } + + public Rights getRights() + { + return _right; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java index b2e4094edd..1ddca3a64e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java @@ -20,9 +20,16 @@ */ package org.apache.qpid.server.security.access; +import java.security.Principal; + public class AllowAll implements AccessManager { + public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) + { + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + public AccessResult isAuthorized(Accessable accessObject, String username) { return new AccessResult(this, AccessResult.AccessStatus.GRANTED); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java index 0e62d2657f..bf40eeba4e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java @@ -20,8 +20,15 @@ */ package org.apache.qpid.server.security.access; +import java.security.Principal; + public class DenyAll implements AccessManager { + public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) + { + return new AccessResult(this, AccessResult.AccessStatus.REFUSED); + } + public AccessResult isAuthorized(Accessable accessObject, String username) { return new AccessResult(this, AccessResult.AccessStatus.REFUSED); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java new file mode 100644 index 0000000000..291bc714ed --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.FileNotFoundException; +import java.io.File; +import java.util.regex.Pattern; +import java.security.Principal; + +/** + * Represents a user database where the account information is stored in a simple flat file. + * + * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn + * + * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. + */ +public class FileAccessManager implements AccessManager +{ + private static final Logger _logger = Logger.getLogger(FileAccessManager.class); + + protected File _accessFile; + + protected Pattern _regexp = Pattern.compile(":"); + + private static final short USER_INDEX = 0; + private static final short VIRTUALHOST_INDEX = 1; + + public void setAccessFile(String accessFile) throws FileNotFoundException + { + File f = new File(accessFile); + _logger.info("FileAccessManager using file " + f.getAbsolutePath()); + _accessFile = f; + if (!f.exists()) + { + throw new FileNotFoundException("Cannot find access file " + f); + } + if (!f.canRead()) + { + throw new FileNotFoundException("Cannot read access file " + f + + ". Check permissions."); + } + } + + /** + * Looks up the virtual hosts for a specified user in the access file. + * + * @param user The user to lookup + * + * @return a list of virtualhosts + */ + private VirtualHostAccess[] lookupVirtualHost(String user) + { + String[] results = lookup(user, VIRTUALHOST_INDEX); + VirtualHostAccess vhosts[] = new VirtualHostAccess[results.length]; + + for (int index = 0; index < results.length; index++) + { + vhosts[index] = new VirtualHostAccess(results[index]); + } + + return vhosts; + } + + + private String[] lookup(String user, int index) + { + try + { + BufferedReader reader = null; + try + { + reader = new BufferedReader(new FileReader(_accessFile)); + String line; + + while ((line = reader.readLine()) != null) + { + String[] result = _regexp.split(line); + if (result == null || result.length < (index + 1)) + { + continue; + } + + if (user.equals(result[USER_INDEX])) + { + return result[index].split(","); + } + } + return null; + } + finally + { + if (reader != null) + { + reader.close(); + } + } + } + catch (IOException ioe) + { + //ignore + } + return null; + } + + public AccessResult isAuthorized(Accessable accessObject, String username) + { + return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); + } + + public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) + { + if (accessObject instanceof VirtualHost) + { + VirtualHostAccess[] hosts = lookupVirtualHost(user.getName()); + + if (hosts != null) + { + for (VirtualHostAccess host : hosts) + { + if (accessObject.getAccessableName().equals(host.getVirtualHost())) + { + if (host.getAccessRights().allows(rights)) + { + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + else + { + return new AccessResult(this, AccessResult.AccessStatus.REFUSED); + } + } + } + } + } +// else if (accessObject instanceof AMQQueue) +// { +// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost()); +// +// if (queues != null) +// { +// for (String queue : queues) +// { +// if (accessObject.getAccessableName().equals(queue)) +// { +// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); +// } +// } +// } +// } + + return new AccessResult(this, AccessResult.AccessStatus.REFUSED); + } + + public String getName() + { + return "FileAccessManager"; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java index 0e447b5744..6ccadb2e7d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.security.access; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.log4j.Logger; +import java.security.Principal; + public class PrincipalDatabaseAccessManager implements AccessManager { private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAccessManager.class); @@ -58,15 +61,21 @@ public class PrincipalDatabaseAccessManager implements AccessManager } } + public AccessResult isAuthorized(Accessable accessObject, String username) { + return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); + } + + public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) + { AccessResult result; if (_database == null) { if (_default != null) { - result = _default.isAuthorized(accessObject, username); + result = _default.isAuthorized(accessObject, username, rights); } else { @@ -75,7 +84,15 @@ public class PrincipalDatabaseAccessManager implements AccessManager } else { - result = ((AccessManager) _database).isAuthorized(accessObject, username); + if (!(_database instanceof AccessManager)) + { + _logger.warn("Specified PrincipalDatabase is not an AccessManager so using default AccessManager"); + result = _default.isAuthorized(accessObject, username, rights); + } + else + { + result = ((AccessManager) _database).isAuthorized(accessObject, username, rights); + } } result.addAuthorizer(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java new file mode 100644 index 0000000000..6381213398 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.server.management.MBeanOperation; +import org.apache.qpid.server.management.MBeanOperationParameter; +import org.apache.qpid.server.management.MBeanAttribute; +import org.apache.qpid.AMQException; + +import javax.management.openmbean.TabularData; +import javax.management.openmbean.CompositeData; +import javax.management.JMException; +import java.io.IOException; + +public interface UserManagement +{ + String TYPE = "UserManagement"; + + //********** Operations *****************// + /** + * set password for user + * + * @param username The username to create + * @param password The password for the user + * + * @return The result of the operation + */ + @MBeanOperation(name = "setPassword", description = "Set password for user.") + boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "password", description = "Password")String password); + + /** + * set rights for users with given details + * + * @param username The username to create + * @param read The set of permission to give the new user + * @param write The set of permission to give the new user + * @param admin The set of permission to give the new user + * + * @return The result of the operation + */ + @MBeanOperation(name = "setRights", description = "Set access rights for user.") + boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, + @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, + @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); + + /** + * Create users with given details + * + * @param username The username to create + * @param password The password for the user + * @param read The set of permission to give the new user + * @param write The set of permission to give the new user + * @param admin The set of permission to give the new user + * + * @return The result of the operation + */ + @MBeanOperation(name = "createUser", description = "Create new user from system.") + boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "password", description = "Password")String password, + @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, + @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, + @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); + + /** + * View users returns all the users that are currently available to the system. + * + * @param username The user to delete + * + * @return The result of the operation + */ + @MBeanOperation(name = "deleteUser", description = "Delete user from system.") + boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username); + + + /** + * Reload the date from disk + * + * @return The result of the operation + */ +// @MBeanOperation(name = "reloadData", description = "Reload the authentication file from disk.") +// boolean reloadData(); + + /** + * View users returns all the users that are currently available to the system. + * + * @return a table of users data (Username, read, write, admin) + */ + @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.") + TabularData viewUsers(); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java new file mode 100644 index 0000000000..13151a66b8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +public class VirtualHostAccess +{ + private String _vhost; + private AccessRights _rights; + + public VirtualHostAccess(String vhostaccess) + { + //format <vhost>(<rights>) + int hostend = vhostaccess.indexOf('('); + + if (hostend == -1) + { + throw new IllegalArgumentException("VirtualHostAccess format string contains no access _rights"); + } + + _vhost = vhostaccess.substring(0, hostend); + + String rights = vhostaccess.substring(hostend); + + if (rights.indexOf('r') != -1) + { + if (rights.indexOf('w') != -1) + { + _rights = new AccessRights(AccessRights.Rights.READWRITE); + } + else + { + _rights = new AccessRights(AccessRights.Rights.READ); + } + } + else if (rights.indexOf('w') != -1) + { + _rights = new AccessRights(AccessRights.Rights.WRITE); + } + } + + public AccessRights getAccessRights() + { + return _rights; + } + + public String getVirtualHost() + { + return _vhost; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java new file mode 100644 index 0000000000..956db64d90 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.auth.database; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser; +import org.apache.qpid.server.security.access.AMQUserManagementMBean; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.EncoderException; + +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.login.AccountNotFoundException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.UnsupportedEncodingException; +import java.io.PrintStream; +import java.util.regex.Pattern; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.locks.ReentrantLock; +import java.security.Principal; +import java.security.NoSuchAlgorithmException; +import java.security.MessageDigest; + +/** + * Represents a user database where the account information is stored in a simple flat file. + * + * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn + * + * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. + */ +public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase +{ + private static final Logger _logger = Logger.getLogger(Base64MD5PasswordFilePrincipalDatabase.class); + + private File _passwordFile; + + private Pattern _regexp = Pattern.compile(":"); + + private Map<String, AuthenticationProviderInitialiser> _saslServers; + + AMQUserManagementMBean _mbean; + private static final String DEFAULT_ENCODING = "utf-8"; + private Map<String, User> _users = new HashMap<String, User>(); + private ReentrantLock _userUpdate = new ReentrantLock(); + + public Base64MD5PasswordFilePrincipalDatabase() + { + _saslServers = new HashMap<String, AuthenticationProviderInitialiser>(); + + /** + * Create Authenticators for MD5 Password file. + */ + + // Accept Plain incomming and hash it for comparison to the file. + CRAMMD5HashedInitialiser cram = new CRAMMD5HashedInitialiser(); + cram.initialise(this); + _saslServers.put(cram.getMechanismName(), cram); + + //fixme The PDs should setup a PD Mangement MBean +// try +// { +// _mbean = new AMQUserManagementMBean(); +// _mbean.setPrincipalDatabase(this); +// } +// catch (JMException e) +// { +// _logger.warn("User management disabled as unable to create MBean:" + e); +// } + } + + public void setPasswordFile(String passwordFile) throws IOException + { + File f = new File(passwordFile); + _logger.info("PasswordFilePrincipalDatabase using file " + f.getAbsolutePath()); + _passwordFile = f; + if (!f.exists()) + { + throw new FileNotFoundException("Cannot find password file " + f); + } + if (!f.canRead()) + { + throw new FileNotFoundException("Cannot read password file " + f + + ". Check permissions."); + } + + loadPasswordFile(); + } + + /** + * SASL Callback Mechanism - sets the Password in the PasswordCallback based on the value in the PasswordFile + * + * @param principal The Principal to set the password for + * @param callback The PasswordCallback to call setPassword on + * + * @throws AccountNotFoundException If the Principal cannont be found in this Database + */ + public void setPassword(Principal principal, PasswordCallback callback) throws AccountNotFoundException + { + if (_passwordFile == null) + { + throw new AccountNotFoundException("Unable to locate principal since no password file was specified during initialisation"); + } + if (principal == null) + { + throw new IllegalArgumentException("principal must not be null"); + } + + char[] pwd = lookupPassword(principal.getName()); + + if (pwd != null) + { + callback.setPassword(pwd); + } + else + { + throw new AccountNotFoundException("No account found for principal " + principal); + } + } + + /** + * Used to verify that the presented Password is correct. Currently only used by Management Console + * + * @param principal The principal to authenticate + * @param password The password to check + * + * @return true if password is correct + * + * @throws AccountNotFoundException if the principal cannot be found + */ + public boolean verifyPassword(String principal, String password) throws AccountNotFoundException + { + try + { + char[] pwd = lookupPassword(principal); + byte[] passwordBytes = password.getBytes(DEFAULT_ENCODING); + + int index = 0; + boolean verified = true; + + while (verified & index < passwordBytes.length) + { + verified = (pwd[index] == (char) passwordBytes[index]); + index++; + } + return verified; + } + catch (UnsupportedEncodingException e) + { + return false; + } + } + + public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + { + User user = _users.get(principal.getName()); + + if (user == null) + { + throw new AccountNotFoundException(principal.getName()); + } + + try + { + + char[] passwd = convertPassword(password); + + try + { + _userUpdate.lock(); + user.setPassword(passwd); + + try + { + savePasswordFile(); + } + catch (IOException e) + { + _logger.error("Unable to save password file, password change for user'" + + principal + "' will revert at restart"); + return false; + } + return true; + } + finally + { + if (_userUpdate.isHeldByCurrentThread()) + { + _userUpdate.unlock(); + } + } + } + catch (UnsupportedEncodingException e) + { + return false; + } + } + + private char[] convertPassword(String password) throws UnsupportedEncodingException + { + byte[] passwdBytes = password.getBytes(DEFAULT_ENCODING); + + char[] passwd = new char[passwdBytes.length]; + + int index = 0; + + for (byte b : passwdBytes) + { + passwd[index++] = (char) b; + } + + return passwd; + } + + public boolean createPrincipal(Principal principal, String password) + { + if (_users.get(principal.getName()) != null) + { + return false; + } + + User user; + try + { + user = new User(principal.getName(), convertPassword(password)); + } + catch (UnsupportedEncodingException e) + { + _logger.warn("Unable to encode password:" + e); + return false; + } + + try + { + _userUpdate.lock(); + _users.put(user.getName(), user); + + try + { + savePasswordFile(); + return true; + } + catch (IOException e) + { + return false; + } + + } + finally + { + if (_userUpdate.isHeldByCurrentThread()) + { + _userUpdate.unlock(); + } + } + } + + public boolean deletePrincipal(Principal principal) throws AccountNotFoundException + { + User user = _users.get(principal.getName()); + + if (user == null) + { + throw new AccountNotFoundException(principal.getName()); + } + + try + { + _userUpdate.lock(); + user.delete(); + + try + { + savePasswordFile(); + } + catch (IOException e) + { + _logger.warn("Unable to remove user '" + user.getName() + "' from password file."); + return false; + } + + _users.remove(user.getName()); + } + finally + { + if (_userUpdate.isHeldByCurrentThread()) + { + _userUpdate.unlock(); + } + } + + return true; + } + + + public Map<String, AuthenticationProviderInitialiser> getMechanisms() + { + return _saslServers; + } + + public List<Principal> getUsers() + { + return new LinkedList<Principal>(_users.values()); + } + + public Principal getUser(String username) + { + if (_users.containsKey(username)) + { + return new UsernamePrincipal(username); + } + return null; + } + + /** + * Looks up the password for a specified user in the password file. Note this code is <b>not</b> secure since it + * creates strings of passwords. It should be modified to create only char arrays which get nulled out. + * + * @param name The principal name to lookup + * + * @return a char[] for use in SASL. + */ + private char[] lookupPassword(String name) + { + User user = _users.get(name); + if (user == null) + { + return null; + } + else + { + return user.getPassword(); + } + } + + + private void loadPasswordFile() throws IOException + { + try + { + _userUpdate.lock(); + _users.clear(); + + BufferedReader reader = null; + try + { + reader = new BufferedReader(new FileReader(_passwordFile)); + String line; + + while ((line = reader.readLine()) != null) + { + String[] result = _regexp.split(line); + if (result == null || result.length < 2 || result[0].startsWith("#")) + { + continue; + } + + User user = new User(result); + _logger.info("Created user:" + user); + _users.put(user.getName(), user); + } + } + finally + { + if (reader != null) + { + reader.close(); + } + } + } + finally + { + if (_userUpdate.isHeldByCurrentThread()) + { + _userUpdate.unlock(); + } + } + } + + private void savePasswordFile() throws IOException + { + try + { + _userUpdate.lock(); + + BufferedReader reader = null; + PrintStream writer = null; + File tmp = new File(_passwordFile.getAbsolutePath() + ".tmp"); + if (tmp.exists()) + { + tmp.delete(); + } + try + { + writer = new PrintStream(tmp); + reader = new BufferedReader(new FileReader(_passwordFile)); + String line; + + while ((line = reader.readLine()) != null) + { + String[] result = _regexp.split(line); + if (result == null || result.length < 2 || result[0].startsWith("#")) + { + writer.write(line.getBytes(DEFAULT_ENCODING)); + continue; + } + + User user = _users.get(result[0]); + + if (user == null) + { + writer.write(line.getBytes(DEFAULT_ENCODING)); + writer.println(); + } + else if (!user.isDeleted()) + { + if (!user.isModified()) + { + writer.write(line.getBytes(DEFAULT_ENCODING)); + writer.println(); + } + else + { + try + { + byte[] encodedPassword = user.getEncodePassword(); + + writer.write((user.getName() + ":").getBytes(DEFAULT_ENCODING)); + writer.write(encodedPassword); + writer.println(); + + user.saved(); + } + catch (Exception e) + { + _logger.warn("Unable to encode new password reverting to old password."); + writer.write(line.getBytes(DEFAULT_ENCODING)); + writer.println(); + } + } + } + } + + for (User user : _users.values()) + { + if (user.isModified()) + { + byte[] encodedPassword; + try + { + encodedPassword = user.getEncodePassword(); + writer.write((user.getName() + ":").getBytes(DEFAULT_ENCODING)); + writer.write(encodedPassword); + writer.println(); + user.saved(); + } + catch (Exception e) + { + _logger.warn("Unable to get Encoded password for user'" + user.getName() + "' password not saved"); + } + } + } + } + finally + { + if (reader != null) + { + reader.close(); + } + + if (writer != null) + { + writer.close(); + } + + // Swap temp file to main password file. + File old = new File(_passwordFile.getAbsoluteFile() + ".old"); + if (old.exists()) + { + old.delete(); + } + _passwordFile.renameTo(old); + tmp.renameTo(_passwordFile); + tmp.delete(); + } + } + finally + { + if (_userUpdate.isHeldByCurrentThread()) + { + _userUpdate.unlock(); + } + } + } + + private class User implements Principal + { + String _name; + char[] _password; + byte[] _encodedPassword = null; + private boolean _modified = false; + private boolean _deleted = false; + + User(String[] data) throws UnsupportedEncodingException + { + if (data.length != 2) + { + throw new IllegalArgumentException("User Data should be lenght 2, username, password"); + } + + _name = data[0]; + + byte[] encoded_password = data[1].getBytes(DEFAULT_ENCODING); + + Base64 b64 = new Base64(); + byte[] decoded = b64.decode(encoded_password); + + _encodedPassword = encoded_password; + + _password = new char[decoded.length]; + + int index = 0; + for (byte c : decoded) + { + _password[index++] = (char) c; + } + } + + public User(String name, char[] password) + { + _name = name; + setPassword(password); + } + + public String getName() + { + return _name; + } + + public String toString() + { + if (_logger.isDebugEnabled()) + { + return getName() + ((_encodedPassword == null) ? "" : ":" + new String(_encodedPassword)); + } + else + { + return _name; + } + } + + char[] getPassword() + { + return _password; + } + + void setPassword(char[] password) + { + _password = password; + _modified = true; + _encodedPassword = null; + } + + + byte[] getEncodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException + { + if (_encodedPassword == null) + { + encodePassword(); + } + return _encodedPassword; + } + + private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException + { + Base64 b64 = new Base64(); + _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING)); + } + + public boolean isModified() + { + return _modified; + } + + public boolean isDeleted() + { + return _deleted; + } + + public void delete() + { + _deleted = true; + } + + public void saved() + { + _modified = false; + } + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index 0c35206dd3..2d3f5e5131 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -1,38 +1,46 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.server.security.auth.database; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; + +import org.apache.log4j.Logger; + +import org.apache.qpid.configuration.PropertyUtils; +import org.apache.qpid.configuration.PropertyException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.configuration.PropertyUtils; -import org.apache.log4j.Logger; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.access.AMQUserManagementMBean; +import org.apache.qpid.AMQException; -import java.util.Map; -import java.util.List; -import java.util.HashMap; -import java.lang.reflect.Method; -import java.io.FileNotFoundException; +import javax.management.JMException; public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatabaseManager { @@ -80,18 +88,21 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab initialisePrincipalDatabase((PrincipalDatabase) o, config, i); String name = databaseNames.get(i); - if (name == null || name.length() == 0) + if ((name == null) || (name.length() == 0)) { throw new Exception("Principal database names must have length greater than or equal to one character"); } + PrincipalDatabase pd = databases.get(name); if (pd != null) { throw new Exception("Duplicate principal database name not provided"); } + _logger.info("Initialised principal database '" + name + "' successfully"); databases.put(name, (PrincipalDatabase) o); } + return databases; } @@ -104,14 +115,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab for (int i = 0; i < argumentNames.size(); i++) { String argName = argumentNames.get(i); - if (argName == null || argName.length() == 0) + if ((argName == null) || (argName.length() == 0)) { throw new ConfigurationException("Argument names must have length >= 1 character"); } + if (Character.isLowerCase(argName.charAt(0))) { argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); } + String methodName = "set" + argName; Method method = null; try @@ -125,9 +138,10 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab if (method == null) { - throw new ConfigurationException("No method " + methodName + " found in class " + principalDatabase.getClass() + - " hence unable to configure principal database. The method must be public and " + - "have a single String argument with a void return type"); + throw new ConfigurationException("No method " + methodName + " found in class " + + principalDatabase.getClass() + + " hence unable to configure principal database. The method must be public and " + + "have a single String argument with a void return type"); } try @@ -136,7 +150,14 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab } catch (Exception ite) { - throw new ConfigurationException(ite.getCause()); + if (ite instanceof ConfigurationException) + { + throw(ConfigurationException) ite; + } + else + { + throw new ConfigurationException(ite.getMessage(), ite); + } } } } @@ -145,4 +166,71 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab { return _databases; } + + public void initialiseManagement(Configuration config) throws ConfigurationException + { + try + { + AMQUserManagementMBean _mbean = new AMQUserManagementMBean(); + + String baseSecurity = "security.jmx"; + List<String> principalDBs = config.getList(baseSecurity + ".principal-database"); + + if (principalDBs.size() == 0) + { + throw new ConfigurationException("No principal-database specified for jmx security(" + baseSecurity + ".principal-database)"); + } + + String databaseName = principalDBs.get(0); + + PrincipalDatabase database = getDatabases().get(databaseName); + + if (database == null) + { + throw new ConfigurationException("Principal-database '" + databaseName + "' not found"); + } + + _mbean.setPrincipalDatabase(database); + + List<String> jmxaccesslist = config.getList(baseSecurity + ".access"); + + if (jmxaccesslist.size() == 0) + { + throw new ConfigurationException("No access control files specified for jmx security(" + baseSecurity + ".access)"); + } + + String jmxaccesssFile = null; + + try + { + jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0)); + } + catch (PropertyException e) + { + throw new ConfigurationException("Unable to parse access control filename '" + jmxaccesssFile + "'"); + } + + try + { + _mbean.setAccessFile(jmxaccesssFile); + } + catch (IOException e) + { + _logger.warn("Unable to load access file:" + jmxaccesssFile); + } + + try + { + _mbean.register(); + } + catch (AMQException e) + { + _logger.warn("Unable to register user management MBean"); + } + } + catch (JMException e) + { + _logger.warn("User management disabled as unable to create MBean:" + e); + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java deleted file mode 100644 index c24a5f21e9..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.auth.database; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; -import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser; -import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser; - -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.login.AccountNotFoundException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.BufferedReader; -import java.io.FileReader; -import java.util.regex.Pattern; -import java.util.Map; -import java.util.HashMap; -import java.security.Principal; - -/** - * Represents a user database where the account information is stored in a simple flat file. - * - * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn - * - * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. - */ -public class MD5PasswordFilePrincipalDatabase implements PrincipalDatabase -{ - private static final Logger _logger = Logger.getLogger(MD5PasswordFilePrincipalDatabase.class); - - private File _passwordFile; - - private Pattern _regexp = Pattern.compile(":"); - - private Map<String, AuthenticationProviderInitialiser> _saslServers; - - public MD5PasswordFilePrincipalDatabase() - { - _saslServers = new HashMap<String, AuthenticationProviderInitialiser>(); - - /** - * Create Authenticators for MD5 Password file. - */ - - // Accept MD5 incomming and use plain comparison with the file - PlainInitialiser cram = new PlainInitialiser(); - cram.initialise(this); - // Accept Plain incomming and hash it for comparison to the file. - CRAMMD5Initialiser plain = new CRAMMD5Initialiser(); - plain.initialise(this,CRAMMD5Initialiser.HashDirection.INCOMMING); - - _saslServers.put(plain.getMechanismName(), cram); - _saslServers.put(cram.getMechanismName(), plain); - } - - public void setPasswordFile(String passwordFile) throws FileNotFoundException - { - File f = new File(passwordFile); - _logger.info("PasswordFilePrincipalDatabase using file " + f.getAbsolutePath()); - _passwordFile = f; - if (!f.exists()) - { - throw new FileNotFoundException("Cannot find password file " + f); - } - if (!f.canRead()) - { - throw new FileNotFoundException("Cannot read password file " + f + - ". Check permissions."); - } - } - - public void setPassword(Principal principal, PasswordCallback callback) throws IOException, - AccountNotFoundException - { - if (_passwordFile == null) - { - throw new AccountNotFoundException("Unable to locate principal since no password file was specified during initialisation"); - } - if (principal == null) - { - throw new IllegalArgumentException("principal must not be null"); - } - char[] pwd = lookupPassword(principal.getName()); - if (pwd != null) - { - callback.setPassword(pwd); - } - else - { - throw new AccountNotFoundException("No account found for principal " + principal); - } - } - - public Map<String, AuthenticationProviderInitialiser> getMechanisms() - { - return _saslServers; - } - - /** - * Looks up the password for a specified user in the password file. Note this code is <b>not</b> secure since it - * creates strings of passwords. It should be modified to create only char arrays which get nulled out. - * - * @param name - * - * @return - * - * @throws java.io.IOException - */ - private char[] lookupPassword(String name) throws IOException - { - BufferedReader reader = null; - try - { - reader = new BufferedReader(new FileReader(_passwordFile)); - String line; - - while ((line = reader.readLine()) != null) - { - String[] result = _regexp.split(line); - if (result == null || result.length < 2) - { - continue; - } - - if (name.equals(result[0])) - { - return result[1].toCharArray(); - } - } - return null; - } - finally - { - if (reader != null) - { - reader.close(); - } - } - } -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java index 3abdd9a7ff..3f6794aaaf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.security.auth.database; import org.apache.log4j.Logger; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainInitialiser; import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser; import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser; @@ -34,9 +34,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.BufferedReader; import java.io.FileReader; +import java.io.UnsupportedEncodingException; import java.util.regex.Pattern; import java.util.Map; import java.util.HashMap; +import java.util.List; import java.security.Principal; /** @@ -119,21 +121,103 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase } } + public boolean verifyPassword(String principal, String password) throws AccountNotFoundException + { + try + { + char[] pwd = lookupPassword(principal); + + return compareCharArray(pwd, convertPassword(password)); + } + catch (IOException e) + { + return false; + } + } + + private char[] convertPassword(String password) throws UnsupportedEncodingException + { + byte[] passwdBytes = password.getBytes("utf-8"); + + char[] passwd = new char[passwdBytes.length]; + + int index = 0; + + for (byte b : passwdBytes) + { + passwd[index++] = (char) b; + } + + return passwd; + } + + public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + { + return false; // updates denied + } + + public boolean createPrincipal(Principal principal, String password) + { + return false; // updates denied + } + + public boolean deletePrincipal(Principal principal) throws AccountNotFoundException + { + return false; // updates denied + } + public Map<String, AuthenticationProviderInitialiser> getMechanisms() { return _saslServers; } + public List<Principal> getUsers() + { + return null; //todo + } + + public Principal getUser(String username) + { + try + { + if (lookupPassword(username) != null) + { + return new UsernamePrincipal(username); + } + } + catch (IOException e) + { + //fall through to null return + } + return null; + } + + private boolean compareCharArray(char[] a, char[] b) + { + boolean equal = false; + if (a.length == b.length) + { + equal = true; + int index = 0; + while (equal && index < a.length) + { + equal = a[index] == b[index]; + index++; + } + } + return equal; + } + /** * Looks up the password for a specified user in the password file. Note this code is <b>not</b> secure since it * creates strings of passwords. It should be modified to create only char arrays which get nulled out. * - * @param name + * @param name the name of the principal to lookup * - * @return + * @return char[] of the password * - * @throws java.io.IOException + * @throws java.io.IOException whilst accessing the file */ private char[] lookupPassword(String name) throws IOException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java index c8318d6e64..598f8f8b4c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java @@ -20,26 +20,17 @@ */ package org.apache.qpid.server.security.auth.database; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; -import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser; -import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser; import org.apache.qpid.server.security.access.AccessManager; import org.apache.qpid.server.security.access.AccessResult; import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.access.AccessRights; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.log4j.Logger; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.login.AccountNotFoundException; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.BufferedReader; import java.io.FileReader; -import java.util.regex.Pattern; -import java.util.Map; -import java.util.HashMap; import java.security.Principal; /** @@ -103,9 +94,15 @@ public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePr public AccessResult isAuthorized(Accessable accessObject, String username) { + return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); + } + + public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) + { + if (accessObject instanceof VirtualHost) { - String[] hosts = lookupVirtualHost(username); + String[] hosts = lookupVirtualHost(user.getName()); if (hosts != null) { @@ -126,4 +123,5 @@ public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePr { return "PlainPasswordVhostFile"; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java index 6c5a2a44ee..8073fcc3c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java @@ -23,8 +23,10 @@ package org.apache.qpid.server.security.auth.database; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.security.Principal; import java.util.Map; +import java.util.List; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.login.AccountNotFoundException; @@ -46,5 +48,53 @@ public interface PrincipalDatabase void setPassword(Principal principal, PasswordCallback callback) throws IOException, AccountNotFoundException; + /** + * Used to verify that the presented Password is correct. Currently only used by Management Console + * @param principal The principal to authenticate + * @param password The password to check + * @return true if password is correct + * @throws AccountNotFoundException if the principal cannot be found + */ + boolean verifyPassword(String principal, String password) + throws AccountNotFoundException; + + /** + * Update(Change) the password for the given principal + * @param principal Who's password is to be changed + * @param password The new password to use + * @return True if change was successful + * @throws AccountNotFoundException If the given principal doesn't exist in the Database + */ + boolean updatePassword(Principal principal, String password) + throws AccountNotFoundException; + + /** + * Create a new principal in the database + * @param principal The principal to create + * @param password The password to set for the principal + * @return True on a successful creation + */ + boolean createPrincipal(Principal principal, String password); + + /** + * Delete a principal + * @param principal The principal to delete + * @return True on a successful creation + * @throws AccountNotFoundException If the given principal doesn't exist in the Database + */ + boolean deletePrincipal(Principal principal) + throws AccountNotFoundException; + + /** + * Get the principal from the database with the given username + * @param username of the principal to lookup + * @return The Principal object for the given username or null if not found. + */ + Principal getUser(String username); + + public Map<String, AuthenticationProviderInitialiser> getMechanisms(); + + + List<Principal> getUsers(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java index 83f1201bd8..2c553ae76a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java @@ -21,10 +21,14 @@ package org.apache.qpid.server.security.auth.database; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import java.util.Map; public interface PrincipalDatabaseManager { public Map<String, PrincipalDatabase> getDatabases(); + + public void initialiseManagement(Configuration config) throws ConfigurationException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java index 9a58acd98c..b1ac0e1f00 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.security.auth.database; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser; import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser; @@ -29,8 +30,10 @@ import javax.security.auth.login.AccountNotFoundException; import java.util.Properties; import java.util.Map; import java.util.HashMap; +import java.util.List; import java.security.Principal; import java.io.IOException; +import java.io.UnsupportedEncodingException; public class PropertiesPrincipalDatabase implements PrincipalDatabase { @@ -76,8 +79,87 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase } } + public boolean verifyPassword(String principal, String password) throws AccountNotFoundException + { + char[] pwd = _users.getProperty(principal).toCharArray(); + + try + { + return compareCharArray(pwd, convertPassword(password)); + } + catch (UnsupportedEncodingException e) + { + return false; + } + } + + public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + { + return false; // updates denied + } + + public boolean createPrincipal(Principal principal, String password) + { + return false; // updates denied + } + + public boolean deletePrincipal(Principal principal) throws AccountNotFoundException + { + return false; // updates denied + } + + private boolean compareCharArray(char[] a, char[] b) + { + boolean equal = false; + if (a.length == b.length) + { + equal = true; + int index = 0; + while (equal && index < a.length) + { + equal = a[index] == b[index]; + index++; + } + } + return equal; + } + + private char[] convertPassword(String password) throws UnsupportedEncodingException + { + byte[] passwdBytes = password.getBytes("utf-8"); + + char[] passwd = new char[passwdBytes.length]; + + int index = 0; + + for (byte b : passwdBytes) + { + passwd[index++] = (char) b; + } + + return passwd; + } + + public Map<String, AuthenticationProviderInitialiser> getMechanisms() { return _saslServers; } + + public List<Principal> getUsers() + { + return null; //todo + } + + public Principal getUser(String username) + { + if (_users.getProperty(username) != null) + { + return new UsernamePrincipal(username); + } + else + { + return null; + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java index 89c84e8130..6b86a46bd2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.security.auth.database; +import org.apache.commons.configuration.Configuration; + import java.util.Map; import java.util.Properties; import java.util.HashMap; @@ -38,4 +40,9 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana { return _databases; } + + public void initialiseManagement(Configuration config) + { + //todo + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 0546bbb81e..ce5e0cd748 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -71,7 +71,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>(); - if (name == null) + if (name == null || hostConfig == null) { initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases()); } @@ -108,11 +108,15 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan if (providerMap.size() > 0) { - Security.addProvider(new JCAProvider(providerMap)); + // Ensure we are used before the defaults + if (Security.insertProviderAt(new JCAProvider(providerMap), 1) == -1) + { + _logger.warn("Unable to set order of providers."); + } } else { - _logger.warn("No SASL providers availble."); + _logger.warn("No additional SASL providers registered."); } } @@ -148,21 +152,20 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan { if (database == null || database.getMechanisms().size() == 0) { - _logger.warn(""); + _logger.warn("No Database or no mechanisms to initialise authentication"); return; } - for (AuthenticationProviderInitialiser mechanism : database.getMechanisms().values()) + for (Map.Entry<String, AuthenticationProviderInitialiser> mechanism : database.getMechanisms().entrySet()) { - initialiseAuthenticationMechanism(mechanism, providerMap); + initialiseAuthenticationMechanism(mechanism.getKey(), mechanism.getValue(), providerMap); } } - private void initialiseAuthenticationMechanism(AuthenticationProviderInitialiser initialiser, + private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser, Map<String, Class<? extends SaslServerFactory>> providerMap) throws Exception { - String mechanism = initialiser.getMechanismName(); if (_mechanisms == null) { _mechanisms = mechanism; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java index 8ffcdc4e36..fd4ad86055 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java @@ -33,7 +33,7 @@ public final class JCAProvider extends Provider super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + "AMQ SASL providers that want to be registered"); register(providerMap); - Security.addProvider(this); + //Security.addProvider(this); } private void register(Map<String, Class<? extends SaslServerFactory>> providerMap) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java index 68095de3a0..dd0bd096c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,14 +33,16 @@ import javax.security.auth.login.AccountNotFoundException; import javax.security.sasl.AuthorizeCallback; import org.apache.commons.configuration.Configuration; + import org.apache.log4j.Logger; + import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; public abstract class UsernamePasswordInitialiser implements AuthenticationProviderInitialiser { - protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class); + protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class); private ServerCallbackHandler _callbackHandler; @@ -72,7 +74,9 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi { // very annoyingly the callback handler does not throw anything more appropriate than // IOException - throw new IOException("Error looking up user " + e); + IOException ioe = new IOException("Error looking up user " + e); + ioe.initCause(e); + throw ioe; } } else if (callback instanceof AuthorizeCallback) @@ -88,7 +92,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi } public void initialise(String baseConfigPath, Configuration configuration, - Map<String, PrincipalDatabase> principalDatabases) throws Exception + Map<String, PrincipalDatabase> principalDatabases) throws Exception { String principalDatabaseName = configuration.getString(baseConfigPath + ".principal-database"); PrincipalDatabase db = principalDatabases.get(principalDatabaseName); @@ -102,6 +106,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi { throw new NullPointerException("Cannot initialise with a null Principal database."); } + _callbackHandler = new ServerCallbackHandler(db); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java index f9aaabd15a..d7c8383690 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java @@ -22,10 +22,7 @@ package org.apache.qpid.server.security.auth.sasl; import java.security.Principal; -/** - * A principal that is just a wrapper for a simple username. - * - */ +/** A principal that is just a wrapper for a simple username. */ public class UsernamePrincipal implements Principal { private String _name; @@ -39,4 +36,9 @@ public class UsernamePrincipal implements Principal { return _name; } + + public String toString() + { + return _name; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java new file mode 100644 index 0000000000..97f9a4e91a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.sasl.crammd5; + +import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; + +import javax.security.sasl.SaslServerFactory; +import java.util.Map; + +public class CRAMMD5HashedInitialiser extends UsernamePasswordInitialiser +{ + public String getMechanismName() + { + return CRAMMD5HashedSaslServer.MECHANISM; + } + + public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration() + { + return CRAMMD5HashedServerFactory.class; + } + + public void initialise(PrincipalDatabase passwordFile) + { + super.initialise(passwordFile); + } + + public Map<String, ?> getProperties() + { + return null; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java new file mode 100644 index 0000000000..f6cab084ea --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.auth.sasl.crammd5; + +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslServerFactory; +import javax.security.auth.callback.CallbackHandler; +import java.util.Enumeration; +import java.util.Map; + +public class CRAMMD5HashedSaslServer implements SaslServer +{ + public static final String MECHANISM = "CRAM-MD5-HASHED"; + + private SaslServer _realServer; + + public CRAMMD5HashedSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, + CallbackHandler cbh) throws SaslException + { + Enumeration factories = Sasl.getSaslServerFactories(); + + while (factories.hasMoreElements()) + { + SaslServerFactory factory = (SaslServerFactory) factories.nextElement(); + + if (factory instanceof CRAMMD5HashedServerFactory) + { + continue; + } + + String[] mechs = factory.getMechanismNames(props); + + for (String mech : mechs) + { + if (mech.equals("CRAM-MD5")) + { + _realServer = factory.createSaslServer("CRAM-MD5", protocol, serverName, props, cbh); + return; + } + } + } + + throw new RuntimeException("No default SaslServer found for mechanism:" + "CRAM-MD5"); + } + + public String getMechanismName() + { + return MECHANISM; + } + + public byte[] evaluateResponse(byte[] response) throws SaslException + { + return _realServer.evaluateResponse(response); + } + + public boolean isComplete() + { + return _realServer.isComplete(); + } + + public String getAuthorizationID() + { + return _realServer.getAuthorizationID(); + } + + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + return _realServer.unwrap(incoming, offset, len); + } + + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + return _realServer.wrap(outgoing, offset, len); + } + + public Object getNegotiatedProperty(String propName) + { + return _realServer.getNegotiatedProperty(propName); + } + + public void dispose() throws SaslException + { + _realServer.dispose(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java new file mode 100644 index 0000000000..5298b5cc63 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.auth.sasl.crammd5; + +import java.util.Map; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslServerFactory; + +public class CRAMMD5HashedServerFactory implements SaslServerFactory +{ + public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, + CallbackHandler cbh) throws SaslException + { + if (mechanism.equals(CRAMMD5HashedSaslServer.MECHANISM)) + { + return new CRAMMD5HashedSaslServer(mechanism, protocol, serverName, props, cbh); + } + else + { + return null; + } + } + + public String[] getMechanismNames(Map props) + { + if (props != null) + { + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) + { + // returned array must be non null according to interface documentation + return new String[0]; + } + } + + return new String[]{CRAMMD5HashedSaslServer.MECHANISM}; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java index ff3e87e3a0..f0dd9eeb6d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java @@ -29,7 +29,7 @@ import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServerFactory; public class PlainSaslServerFactory implements SaslServerFactory -{ +{ public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java index 05d1cd5291..609a85c22f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java @@ -56,18 +56,7 @@ public class CleanupMessageOperation implements TxnOp public void commit(StoreContext context) { - //The routers reference can now be released. This is done - //here to ensure that it happens after the queues that - //enqueue it have incremented their counts (which as a - //memory only operation is done in the commit phase). - try - { - _msg.decrementReference(context); - } - catch (AMQException e) - { - _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); - } + try { _msg.checkDeliveredToConsumer(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index cf0da55f2a..6d776eec0f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -89,6 +89,12 @@ public class LocalTransactionalContext implements TransactionalContext public void rollback() throws AMQException { _txnBuffer.rollback(_storeContext); + // Hack to deal with uncommitted non-transactional writes + if(_messageStore.inTran(_storeContext)) + { + _messageStore.abortTran(_storeContext); + _inTran = false; + } _postCommitDeliveryList.clear(); } @@ -103,6 +109,7 @@ public class LocalTransactionalContext implements TransactionalContext // message.incrementReference(); _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; + _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); if (_log.isDebugEnabled()) { @@ -111,7 +118,7 @@ public class LocalTransactionalContext implements TransactionalContext } message.incrementReference(); _messageDelivered = true; - _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + */ } @@ -195,6 +202,7 @@ public class LocalTransactionalContext implements TransactionalContext { _txnBuffer.enlist(new StoreMessageOperation(_messageStore)); } + //fixme fail commit here ... QPID-440 try { _txnBuffer.commit(_storeContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java index 339ca8ae1a..405c233552 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java @@ -41,7 +41,7 @@ public class TxnBuffer { if (_log.isDebugEnabled()) { - _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray()); + _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops); } if (prepare(context)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index c24d1aa23a..b5c59dbbb7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -181,7 +181,7 @@ public class VirtualHost implements Accessable catch (Exception e)
{
_logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
}
Configurator.configure(instance);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 236291968f..0c1da5c278 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -106,6 +106,8 @@ public class AMQQueueAlertTest extends TestCase /** * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value * + * Based on FT402 subbmitted by client + * * @throws Exception */ public void testQueueDepthAlertNoSubscriber() throws Exception |
