summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/bin/passwd21
-rw-r--r--qpid/java/broker/bin/qpid-server9
-rw-r--r--qpid/java/broker/bin/qpid.stop38
-rw-r--r--qpid/java/broker/bin/qpid.stopall15
-rw-r--r--qpid/java/broker/distribution/src/main/assembly/broker-bin.xml12
-rw-r--r--qpid/java/broker/etc/access1
-rw-r--r--qpid/java/broker/etc/config.xml20
-rw-r--r--qpid/java/broker/etc/jmxremote.access3
-rw-r--r--qpid/java/broker/etc/log4j.xml10
-rw-r--r--qpid/java/broker/etc/persistent_config.xml132
-rw-r--r--qpid/java/broker/etc/transient_config.xml128
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java53
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java91
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java258
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java217
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java96
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java136
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java81
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java457
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java63
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java183
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java111
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java626
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java144
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java160
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java92
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java82
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java105
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java61
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java2
68 files changed, 3469 insertions, 513 deletions
diff --git a/qpid/java/broker/bin/passwd b/qpid/java/broker/bin/passwd
new file mode 100644
index 0000000000..c1bb05c082
--- /dev/null
+++ b/qpid/java/broker/bin/passwd
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. qpid-run org.apache.qpid.server.security.Passwd "$@"
diff --git a/qpid/java/broker/bin/qpid-server b/qpid/java/broker/bin/qpid-server
index 0080209479..a2b416b12b 100644
--- a/qpid/java/broker/bin/qpid-server
+++ b/qpid/java/broker/bin/qpid-server
@@ -18,4 +18,13 @@
# under the License.
#
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ QPID_CLASSPATH=$QPID_LIBS
+
. qpid-run org.apache.qpid.server.Main "$@"
diff --git a/qpid/java/broker/bin/qpid.stop b/qpid/java/broker/bin/qpid.stop
index 1bffc8cdb8..9193d3c4e1 100644
--- a/qpid/java/broker/bin/qpid.stop
+++ b/qpid/java/broker/bin/qpid.stop
@@ -5,9 +5,9 @@
# Script checks for a given pid running PROGRAM and attempts to quit it
#
-MAX_ATTEMPTS=5
-SLEEP_DELAY=2
-PROGRAM="org.apache.qpid.server.Main"
+MAX_ATTEMPTS=1
+SLEEP_DELAY=1
+PROGRAM="DQPID"
#
@@ -15,7 +15,8 @@ PROGRAM="org.apache.qpid.server.Main"
#
printActions()
{
-ps=`ps o command p $1|grep $PROGRAM`
+#ps=`ps o command p $1|grep $PROGRAM`
+ps=`ps -o args -p $1|grep $PROGRAM`
echo "Attempting to kill: $ps"
}
@@ -36,25 +37,25 @@ quit()
kill $1
}
+#
+# Grep the ps log for the PID ($1) to ensure that it has quit
+#
+lookup()
+{
+result=`ps -o args -p $1 |grep -v grep |grep $PROGRAM |wc -l`
+}
#
# Sleep and then check then lookup the PID($1) to ensure it has quit
#
check()
{
+echo "Waiting $SLEEP_DELAY second for $1 to exit"
sleep $SLEEP_DELAY
lookup $1
}
-#
-# Grep the ps log for the PID ($1) to ensure that it has quit
-#
-lookup()
-{
-result=`ps p $1 |grep -v grep |grep $PROGRAM |wc -l`
-}
-
#
# Verify the PID($1) is available
@@ -62,7 +63,7 @@ result=`ps p $1 |grep -v grep |grep $PROGRAM |wc -l`
verifyPid()
{
lookup $1
-if [[ $result == 1 ]] ; then
+if [[ $[$result] == 1 ]] ; then
brokerspid=$1
else
echo "Unable to locate Qpid Process with PID $1"
@@ -70,8 +71,6 @@ else
fi
}
-
-
#
# Main Run
#
@@ -89,22 +88,21 @@ printActions $brokerspid
# Attempt to quit the process MAX_ATTEMPTS Times
attempt=0
-while [[ $result > 0 && $attempt < $MAX_ATTEMPTS ]] ; do
+while [[ $[$result] > 0 && $[$attempt] < $[$MAX_ATTEMPTS] ]] ; do
quit $brokerspid
check $brokerspid
attempt=$[$attempt + 1]
done
-
# Check that it has quit
-if [[ $results == 0 ]] ; then
+if [[ $[$result] == 0 ]] ; then
echo "Process quit"
exit 0
else
# Now attempt to force quit the process
attempt=0
- while [[ $result > 0 && $attempt < $MAX_ATTEMPTS ]] ; do
+ while [[ $[$result] > 0 && $[$attempt] < $[$MAX_ATTEMPTS] ]] ; do
forceQuit $brokerspid
check $brokerspid
attempt=$[$attempt + 1]
@@ -112,7 +110,7 @@ else
# Output final status
- if [[ $attempt == $MAX_ATTEMPTS ]] ; then
+ if [[ $[$result] > 0 && $[$attempt] == $[$MAX_ATTEMPTS] ]] ; then
echo "Stopped trying to kill process: $brokerspid"
echo "Attempted to stop $attempt times"
else
diff --git a/qpid/java/broker/bin/qpid.stopall b/qpid/java/broker/bin/qpid.stopall
index f6862842c9..2e762bdd50 100644
--- a/qpid/java/broker/bin/qpid.stopall
+++ b/qpid/java/broker/bin/qpid.stopall
@@ -6,17 +6,16 @@
# Utilises qpid.stop to perform the actual stopping
#
-MAX_ATTEMPTS=5
-SLEEP_DELAY=2
-PROGRAM="org.apache.qpid.server.Main"
+PROGRAM="DQPID"
#
# grep ps for instances of $PROGRAM and collect PIDs
#
lookup()
{
-pids=`ps o pid,command |grep -v grep | grep $PROGRAM | cut -d ' ' -f 1`
-result=`echo -n $pids | wc -l`
+#pids=`ps o pid,command | grep $PROGRAM | grep -v grep | cut -d ' ' -f 1`
+pids=`ps -ef |grep $USER | grep $PROGRAM | grep -v grep | awk '{print $2}'`
+result=`echo -n $pids | wc -w`
}
@@ -25,7 +24,7 @@ result=`echo -n $pids | wc -l`
#
showPids()
{
-ps p $pids
+ps -o user,pid,args -p $pids
}
@@ -35,7 +34,7 @@ ps p $pids
lookup
-if [[ $result == 0 ]] ; then
+if [[ $[$result] == 0 ]] ; then
echo "No Qpid Brokers found running under user '$USER'"
exit 0
fi
@@ -49,7 +48,7 @@ done
# Check we have quit all
lookup
-if [[ $result == 0 ]] ; then
+if [[ $[$result] == 0 ]] ; then
echo "All Qpid brokers successfully quit"
else
echo "Some brokers were not quit"
diff --git a/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml b/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml
index 4a7343660d..4b32630771 100644
--- a/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml
+++ b/qpid/java/broker/distribution/src/main/assembly/broker-bin.xml
@@ -78,6 +78,12 @@
<fileMode>420</fileMode>
</file>
<file>
+ <source>../etc/jmxremote.access</source>
+ <outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
+ <destName>jmxremote.access</destName>
+ <fileMode>420</fileMode>
+ </file>
+ <file>
<source>../etc/log4j.xml</source>
<outputDirectory>qpid-${qpid.version}/etc</outputDirectory>
<destName>log4j.xml</destName>
@@ -108,6 +114,12 @@
<fileMode>473</fileMode>
</file>
<file>
+ <source>../bin/passwd</source>
+ <outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
+ <destName>passwd</destName>
+ <fileMode>473</fileMode>
+ </file>
+ <file>
<source>../bin/qpid-server</source>
<outputDirectory>qpid-${qpid.version}/bin</outputDirectory>
<destName>qpid-server</destName>
diff --git a/qpid/java/broker/etc/access b/qpid/java/broker/etc/access
new file mode 100644
index 0000000000..a781ed8aa9
--- /dev/null
+++ b/qpid/java/broker/etc/access
@@ -0,0 +1 @@
+guest:localhost(rw),test(rw) \ No newline at end of file
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml
index 3789e6fcb6..c66c2f632e 100644
--- a/qpid/java/broker/etc/config.xml
+++ b/qpid/java/broker/etc/config.xml
@@ -41,6 +41,8 @@
</connector>
<management>
<enabled>true</enabled>
+ <jmxport>8999</jmxport>
+ <security-enabled>true</security-enabled>
</management>
<advanced>
<filterchain enableExecutorPool="true"/>
@@ -63,13 +65,14 @@
</attributes>
</principal-database>
- <!--principal-database>
- <name>md5passwordfile</name>
- <class>org.apache.qpid.server.security.auth.database.MD5PasswordFilePrincipalDatabase</class>
+ <!-- Example use of Base64 encoded MD5 hashes for authentication via CRAM-MD5-Hashed
+ <principal-database>
+ <name>passwordfile</name>
+ <class>org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase</class>
<attributes>
<attribute>
<name>passwordFile</name>
- <value>${conf}/md5passwd</value>
+ <value>${conf}/qpid.passwd</value>
</attribute>
</attributes>
</principal-database-->
@@ -78,6 +81,10 @@
<access>
<class>org.apache.qpid.server.security.access.AllowAll</class>
</access>
+ <jmx>
+ <access>${conf}/jmxremote.access</access>
+ <principal-database>passwordfile</principal-database>
+ </jmx>
</security>
<virtualhosts>
@@ -85,9 +92,10 @@
<name>localhost</name>
<localhost>
<store>
- <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> -->
+ <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/localhost-store</environment-path> -->
+
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
- <environment-path>localhost-store</environment-path>
</store>
<security>
diff --git a/qpid/java/broker/etc/jmxremote.access b/qpid/java/broker/etc/jmxremote.access
new file mode 100644
index 0000000000..d1172fc197
--- /dev/null
+++ b/qpid/java/broker/etc/jmxremote.access
@@ -0,0 +1,3 @@
+admin=admin
+guest=readonly
+user=readwrite
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index 74b80c0e80..b442227607 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -44,20 +44,16 @@
<param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
- <!--param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/-->
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
</layout>
</appender>
- <appender name="FileAppender" class="org.apache.log4j.FileAppender">
- <param name="staticLogFileName" value="false"/>
-
+ <appender name="FileAppender" class="org.apache.log4j.FileAppender">
<param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
<param name="Append" value="false"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/>
-
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
</layout>
</appender>
diff --git a/qpid/java/broker/etc/persistent_config.xml b/qpid/java/broker/etc/persistent_config.xml
new file mode 100644
index 0000000000..178a73515c
--- /dev/null
+++ b/qpid/java/broker/etc/persistent_config.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+
+ This is an example config using the BDBMessageStore available from
+ the Red Hat Messaging project at etp.108.redhat.com and distributed under GPL.
+ -->
+
+<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${QPID_WORK}</work>
+ <conf>${prefix}/etc</conf>
+ <connector>
+ <qpidnio>true</qpidnio>
+ <transport>nio</transport>
+ <port>5672</port>
+ <sslport>8672</sslport>
+ <socketReceiveBuffer>32768</socketReceiveBuffer>
+ <socketSendBuffer>32768</socketSendBuffer>
+ </connector>
+ <management>
+ <enabled>true</enabled>
+ <jmxport>8999</jmxport>
+ </management>
+ <advanced>
+ <filterchain enableExecutorPool="true"/>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <enableDirectBuffers>false</enableDirectBuffers>
+ <framesize>65535</framesize>
+ <compressBufferOnQueue>false</compressBufferOnQueue>
+ </advanced>
+
+ <security>
+ <principal-databases>
+ <principal-database>
+ <name>passwordfile</name>
+ <class>org.apache.qpid.server.security.auth.database.PlainPasswordVhostFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwdVhost</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </principal-databases>
+
+ <access>
+ <class>org.apache.qpid.server.security.access.AllowAll</class>
+ </access>
+ <jmx>
+ <access>${conf}/jmxremote.access</access>
+ <principal-database>passwordfile</principal-database>
+ </jmx>
+ </security>
+
+ <virtualhosts>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdbstore/localhost-store</environment-path>
+ </store>
+
+ <security>
+ <access>
+ <class>org.apache.qpid.server.security.access.PrincipalDatabaseAccessManager</class>
+ <attributes>
+ <attribute>
+ <name>principalDatabase</name>
+ <value>passwordfile</value>
+ </attribute>
+ <attribute>
+ <name>defaultAccessManager</name>
+ <value>DenyAll</value>
+ </attribute>
+ </attributes>
+ </access>
+ </security>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdbstore/development-store</environment-path>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdbstore/test-store</environment-path>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+
+ <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+
diff --git a/qpid/java/broker/etc/transient_config.xml b/qpid/java/broker/etc/transient_config.xml
new file mode 100644
index 0000000000..164d66cd1b
--- /dev/null
+++ b/qpid/java/broker/etc/transient_config.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+
+ This is an example config file that uses the MemoryMessageStore.
+ As a result it is aimed at brokers sending transient messages.
+
+ -->
+<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${QPID_WORK}</work>
+ <conf>${prefix}/etc</conf>
+ <connector>
+ <qpidnio>true</qpidnio>
+ <transport>nio</transport>
+ <port>5672</port>
+ <sslport>8672</sslport>
+ <socketReceiveBuffer>32768</socketReceiveBuffer>
+ <socketSendBuffer>32768</socketSendBuffer>
+ </connector>
+ <management>
+ <enabled>true</enabled>
+ <jmxport>8999</jmxport>
+ </management>
+ <advanced>
+ <filterchain enableExecutorPool="true"/>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <enableDirectBuffers>false</enableDirectBuffers>
+ <framesize>65535</framesize>
+ <compressBufferOnQueue>false</compressBufferOnQueue>
+ </advanced>
+
+ <security>
+ <principal-databases>
+ <principal-database>
+ <name>passwordfile</name>
+ <class>org.apache.qpid.server.security.auth.database.PlainPasswordVhostFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwdVhost</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </principal-databases>
+ <access>
+ <class>org.apache.qpid.server.security.access.AllowAll</class>
+ </access>
+ <jmx>
+ <access>${conf}/jmxremote.access</access>
+ <principal-database>passwordfile</principal-database>
+ </jmx>
+ </security>
+
+ <virtualhosts>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+
+ <security>
+ <access>
+ <class>org.apache.qpid.server.security.access.PrincipalDatabaseAccessManager</class>
+ <attributes>
+ <attribute>
+ <name>principalDatabase</name>
+ <value>passwordfile</value>
+ </attribute>
+ <attribute>
+ <name>defaultAccessManager</name>
+ <value>DenyAll</value>
+ </attribute>
+ </attributes>
+ </access>
+ </security>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+ <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+
+ <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 23c32aceab..d31359b019 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,8 +42,12 @@ import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.commons.configuration.Configuration;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -36,9 +60,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.commons.configuration.Configuration;
/**
* This MBean implements the broker management interface and exposes the
@@ -82,8 +103,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
* @param autoDelete
* @throws JMException
*/
- public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete)
- throws JMException
+ public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException
{
try
{
@@ -92,7 +112,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
if (exchange == null)
{
- exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
+ exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type),
+ durable, false, 0);
_exchangeRegistry.registerExchange(exchange);
}
else
@@ -140,8 +161,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
* @param autoDelete
* @throws JMException
*/
- public void createNewQueue(String queueName, String owner, boolean durable,boolean autoDelete)
- throws JMException
+ public void createNewQueue(String queueName, String owner, boolean durable) throws JMException
{
AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
if (queue != null)
@@ -156,22 +176,27 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
{
ownerShortString = new AMQShortString(owner);
}
- queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, autoDelete, getVirtualHost());
+
+ queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
_messageStore.createQueue(queue);
}
- Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
+ Configuration virtualHostDefaultQueueConfiguration =
+ VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
if (virtualHostDefaultQueueConfiguration != null)
{
Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
}
+
_queueRegistry.registerQueue(queue);
}
catch (AMQException ex)
{
- throw new MBeanException(new JMException(ex.getMessage()),"Error in creating queue " + queueName);
+ JMException jme = new JMException(ex.getMessage());
+ jme.initCause(ex);
+ throw new MBeanException(jme, "Error in creating queue " + queueName);
}
}
@@ -202,7 +227,9 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
}
catch (AMQException ex)
{
- throw new MBeanException(new JMException(ex.getMessage()), "Error in deleting queue " + queueName);
+ JMException jme = new JMException(ex.getMessage());
+ jme.initCause(ex);
+ throw new MBeanException(jme, "Error in deleting queue " + queueName);
}
}
@@ -213,7 +240,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
// This will have a single instance for a virtual host, so not having the name property in the ObjectName
public ObjectName getObjectName() throws MalformedObjectNameException
- {
+ {
return getObjectNameForSingleInstanceMBean();
}
} // End of MBean class
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1ebe5fa0a2..2e1653e69d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -472,7 +472,7 @@ public class AMQChannel
if (unacked.queue != null)
{
// Ensure message is released for redelivery
- unacked.message.release();
+ unacked.message.release(unacked.queue);
// Mark message redelivered
unacked.message.setRedelivered(true);
@@ -503,7 +503,10 @@ public class AMQChannel
{
// Ensure message is released for redelivery
- unacked.message.release();
+ if (unacked.queue != null)
+ {
+ unacked.message.release(unacked.queue);
+ }
// Mark message redelivered
unacked.message.setRedelivered(true);
@@ -672,14 +675,14 @@ public class AMQChannel
// else
// {
//release to allow it to be delivered
- msg.release();
+ msg.release(message.queue);
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription();
+ Subscription sub = msg.getDeliveredSubscription(message.queue);
if (sub != null)
{
@@ -753,7 +756,7 @@ public class AMQChannel
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- message.message.release();
+ message.message.release(message.queue);
message.message.setRedelivered(true);
deliveryContext.deliver(message.message, message.queue, true);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 38a505c6c7..146d0566ce 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,14 +36,17 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -59,7 +62,7 @@ import org.apache.qpid.url.URLSyntaxException;
* Main entry point for AMQPD.
*
*/
-@SuppressWarnings({"AccessStaticViaInstance"})
+@SuppressWarnings({ "AccessStaticViaInstance" })
public class Main
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -74,9 +77,9 @@ public class Main
protected static class InitException extends Exception
{
- InitException(String msg)
+ InitException(String msg, Throwable cause)
{
- super(msg);
+ super(msg, cause);
}
}
@@ -97,6 +100,7 @@ public class Main
try
{
commandLine = new PosixParser().parse(options, args);
+
return true;
}
catch (ParseException e)
@@ -104,6 +108,7 @@ public class Main
System.err.println("Error: " + e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Qpid", options, true);
+
return false;
}
}
@@ -112,17 +117,26 @@ public class Main
{
Option help = new Option("h", "help", false, "print this message");
Option version = new Option("v", "version", false, "print the version information and exit");
- Option configFile = OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").
- withLongOpt("config").create("c");
- Option port = OptionBuilder.withArgName("port").hasArg().withDescription("listen on the specified port. Overrides any value in the config file").
- withLongOpt("port").create("p");
- Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
- withLongOpt("bind").create("b");
- Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
- "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
- withLongOpt("logconfig").create("l");
- Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
- "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+ .create("c");
+ Option port =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified port. Overrides any value in the config file")
+ .withLongOpt("port").create("p");
+ Option bind =
+ OptionBuilder.withArgName("bind").hasArg()
+ .withDescription("bind to the specified address. Overrides any value in the config file")
+ .withLongOpt("bind").create("b");
+ Option logconfig =
+ OptionBuilder.withArgName("logconfig").hasArg()
+ .withDescription("use the specified log4j xml configuration file. By "
+ + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+ Option logwatchconfig =
+ OptionBuilder.withArgName("logwatch").hasArg()
+ .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
options.addOption(help);
options.addOption(version);
@@ -150,7 +164,7 @@ public class Main
boolean first = true;
for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
{
- if(first)
+ if (first)
{
first = false;
}
@@ -158,9 +172,11 @@ public class Main
{
protocol.append(", ");
}
+
protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
}
+
System.out.println(ver + " (" + protocol + ")");
}
else
@@ -186,7 +202,6 @@ public class Main
}
}
-
protected void startup() throws InitException, ConfigurationException, Exception
{
final String QpidHome = System.getProperty("QPID_HOME");
@@ -201,7 +216,7 @@ public class Main
error = error + "\nNote: Qpid_HOME is not set.";
}
- throw new InitException(error);
+ throw new InitException(error, null);
}
else
{
@@ -226,8 +241,8 @@ public class Main
_logger.info("Starting Qpid.AMQP broker");
- ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
- getConfiguredObject(ConnectorConfiguration.class);
+ ConnectorConfiguration connectorConfig =
+ ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class);
ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
@@ -249,7 +264,7 @@ public class Main
}
catch (NumberFormatException e)
{
- throw new InitException("Invalid port: " + portStr);
+ throw new InitException("Invalid port: " + portStr, e);
}
}
@@ -264,19 +279,21 @@ public class Main
int totalVHosts = ((Collection) virtualHosts).size();
for (int vhost = 0; vhost < totalVHosts; vhost++)
{
- setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
+ setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost));
}
}
else
{
- setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
+ setupVirtualHosts(configFile.getParent(), (String) virtualHosts);
}
}
+
bind(port, connectorConfig);
}
- protected void setupVirtualHosts(String configFileParent, String configFilePath) throws ConfigurationException, AMQException, URLSyntaxException
+ protected void setupVirtualHosts(String configFileParent, String configFilePath)
+ throws ConfigurationException, AMQException, URLSyntaxException
{
String configVar = "${conf}";
@@ -285,7 +302,7 @@ public class Main
configFilePath = configFileParent + configFilePath.substring(configVar.length());
}
- if (configFilePath.indexOf(".xml") != -1 )
+ if (configFilePath.indexOf(".xml") != -1)
{
VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
vHostConfig.performBindings();
@@ -298,11 +315,12 @@ public class Main
String[] fileNames = virtualHostDir.list();
- for (int each=0; each < fileNames.length; each++)
+ for (int each = 0; each < fileNames.length; each++)
{
if (fileNames[each].endsWith(".xml"))
{
- VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
+ VirtualHostConfiguration vHostConfig =
+ new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
vHostConfig.performBindings();
}
}
@@ -319,7 +337,7 @@ public class Main
try
{
- //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
+ // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
IoAcceptor acceptor = connectorConfig.createAcceptor();
SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
@@ -334,7 +352,7 @@ public class Main
{
sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
-
+
if (!connectorConfig.enableSSL || !connectorConfig.sslOnly)
{
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
@@ -347,6 +365,7 @@ public class Main
{
bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
+
acceptor.bind(bindAddress, handler, sconfig);
_logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
@@ -356,8 +375,7 @@ public class Main
AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
try
{
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
- handler, sconfig);
+ acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
_logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
}
catch (IOException e)
@@ -415,16 +433,17 @@ public class Main
}
catch (NumberFormatException e)
{
- System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
- "a non-negative integer. Using default of zero (no watching configured");
+ System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+ + "a non-negative integer. Using default of zero (no watching configured");
}
+
if (logConfigFile.exists() && logConfigFile.canRead())
{
System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
if (logWatchTime > 0)
{
- System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
- logWatchTime + " seconds");
+ System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+ + logWatchTime + " seconds");
// log4j expects the watch interval in milliseconds
DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 4d66e37628..de3905268e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -196,6 +196,7 @@ public class DestNameExchange extends AbstractExchange
}
else
{
+ _logger.error("MESSAGE LOSS: Message should be sent on a Dead Letter Queue");
_logger.warn(msg);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 14687c40ae..9052b2e81f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -98,7 +98,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
// if (!evt.getMethod().resend)
{
- message.message.reject(message.message.getDeliveredSubscription());
+ message.message.reject(message.message.getDeliveredSubscription(message.queue));
}
if (evt.getMethod().requeue)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index 2ecb39254f..30a40c5a75 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.security.access.AccessResult;
+import org.apache.qpid.server.security.access.AccessRights;
import org.apache.log4j.Logger;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
@@ -75,23 +76,26 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
if (virtualHost == null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: " + virtualHostName);
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
}
else
{
session.setVirtualHost(virtualHost);
- AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID());
+ AccessResult result = virtualHost.getAccessManager().isAuthorized(virtualHost, session.getAuthorizedID(), AccessRights.Rights.ANY);
switch (result.getStatus())
{
default:
case REFUSED:
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- "Access denied to vHost '" + virtualHostName + "' by "
- + result.getAuthorizer());
+ String error = "Any access denied to vHost '" + virtualHostName + "' by "
+ + result.getAuthorizer();
+
+ _logger.warn(error);
+
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error);
case GRANTED:
- _logger.info("Granted access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
+ _logger.info("Granted any access to vHost '" + virtualHostName + "' for " + session.getAuthorizedID()
+ " by '" + result.getAuthorizer() + "'");
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index 6029a023e5..fef00942a0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.protocol.HeartbeatConfig;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -106,7 +107,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
session.writeFrame(tune);
- session.setAuthorizedID(ss.getAuthorizationID());
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
disposeSaslServer(session);
break;
case CONTINUE:
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 6c14aae7ed..4734143497 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.protocol.HeartbeatConfig;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -95,7 +96,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
- session.setAuthorizedID(ss.getAuthorizationID());
+ session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 9e0a1019f2..2e697d4564 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -64,7 +64,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
private final AtomicInteger _counter = new AtomicInteger();
-
protected QueueDeclareHandler()
{
Configurator.configure(this);
@@ -92,12 +91,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
synchronized (queueRegistry)
{
- if (((queue = queueRegistry.getQueue(body.queue)) == null) )
+ if (((queue = queueRegistry.getQueue(body.queue)) == null))
{
- if(body.passive)
+ if (body.passive)
{
- String msg = "Queue: " + body.queue + " not found.";
- throw body.getChannelException(AMQConstant.NOT_FOUND,msg );
+ String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
{
@@ -112,13 +111,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
queue.bind(body.queue, null, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange");
+ _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
}
}
}
- else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
+ else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+ + " as exclusive queue with same name "
+ + "declared on another client ID('"
+ + queue.getOwner() + "')");
}
AMQChannel channel = session.getChannel(evt.getChannelId());
@@ -138,10 +140,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- queue.getConsumerCount(), // consumerCount
- queue.getMessageCount(), // messageCount
- body.queue); // queue
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ queue.getConsumerCount(), // consumerCount
+ queue.getMessageCount(), // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
session.writeFrame(response);
}
@@ -162,24 +164,22 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
final AMQShortString queueName = queue.getName();
- if(body.exclusive && !body.durable)
+ if (body.exclusive && !body.durable)
{
final AMQProtocolSession.Task deleteQueueTask =
- new AMQProtocolSession.Task()
- {
-
- public void doTask(AMQProtocolSession session) throws AMQException
+ new AMQProtocolSession.Task()
{
- if(registry.getQueue(queueName) == queue)
+ public void doTask(AMQProtocolSession session) throws AMQException
{
- queue.delete();
+ if (registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
}
-
- }
- };
+ };
session.addSessionCloseTask(deleteQueueTask);
@@ -190,16 +190,14 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
session.removeSessionCloseTask(deleteQueueTask);
}
});
-
-
- }
+ }// if exclusive and not durable
Configuration virtualHostDefaultQueueConfiguration = VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
if (virtualHostDefaultQueueConfiguration != null)
{
Configurator.configure(queue, virtualHostDefaultQueueConfiguration);
}
-
+
return queue;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
index c89529f2a3..38c9e4950a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,29 +20,174 @@
*/
package org.apache.qpid.server.management;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.MBeanServerForwarder;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.AuthorizeCallback;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
+
+/**
+ * This class starts up an MBeanserver. If out of the box agent is being used then there are no security features
+ * implemented. To use the security features like user authentication, turn off the jmx options in the "QPID_OPTS" env
+ * variable and use JMXMP connector server. If JMXMP connector is not available, then the standard JMXConnector will be
+ * used, which again doesn't have user authentication.
+ */
public class JMXManagedObjectRegistry implements ManagedObjectRegistry
{
private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class);
private final MBeanServer _mbeanServer;
+ private Registry _rmiRegistry;
+ private JMXServiceURL _jmxURL;
- public JMXManagedObjectRegistry()
+ public JMXManagedObjectRegistry() throws AMQException
{
_log.info("Initialising managed object registry using platform MBean server");
- // we use the platform MBean server currently but this must be changed or at least be configuurable
- _mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+ // Retrieve the config parameters
+ boolean platformServer = appRegistry.getConfiguration().getBoolean("management.platform-mbeanserver", true);
+
+ _mbeanServer =
+ platformServer ? ManagementFactory.getPlatformMBeanServer()
+ : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN);
+ }
+
+
+ public void start()
+ {
+ // Check if the "QPID_OPTS" is set to use Out of the Box JMXAgent
+ if (areOutOfTheBoxJMXOptionsSet())
+ {
+ _log.info("JMX: Using the out of the box JMX Agent");
+ return;
+ }
+
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+ boolean security = appRegistry.getConfiguration().getBoolean("management.security-enabled", true);
+ int port = appRegistry.getConfiguration().getInt("management.jmxport", 8999);
+
+ try
+ {
+ if (security)
+ {
+ // For SASL using JMXMP
+ _jmxURL = new JMXServiceURL("jmxmp", null, port);
+
+ Map env = new HashMap();
+ Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases();
+ PrincipalDatabase db = null;
+
+ for (Map.Entry<String, PrincipalDatabase> entry : map.entrySet())
+ {
+ if (entry.getValue() instanceof Base64MD5PasswordFilePrincipalDatabase)
+ {
+ db = entry.getValue();
+ break;
+ }
+ else if (entry.getValue() instanceof PlainPasswordFilePrincipalDatabase)
+ {
+ db = entry.getValue();
+ }
+ }
+
+ if (db instanceof Base64MD5PasswordFilePrincipalDatabase)
+ {
+ env.put("jmx.remote.profiles", "SASL/CRAM-MD5");
+ CRAMMD5HashedInitialiser initialiser = new CRAMMD5HashedInitialiser();
+ initialiser.initialise(db);
+ env.put("jmx.remote.sasl.callback.handler", initialiser.getCallbackHandler());
+ }
+ else if (db instanceof PlainPasswordFilePrincipalDatabase)
+ {
+ env.put("jmx.remote.profiles", "SASL/PLAIN");
+ env.put("jmx.remote.sasl.callback.handler", new UserCallbackHandler(db));
+ }
+
+ // Enable the SSL security and server authentication
+ /*
+ SslRMIClientSocketFactory csf = new SslRMIClientSocketFactory();
+ SslRMIServerSocketFactory ssf = new SslRMIServerSocketFactory();
+ env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, csf);
+ env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, ssf);
+ */
+
+ try
+ {
+ JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, env, _mbeanServer);
+ MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
+ cs.setMBeanServerForwarder(mbsf);
+ cs.start();
+ _log.info("JMX: Starting JMXConnector server with SASL");
+ }
+ catch (java.net.MalformedURLException urlException)
+ {
+ // When JMXMPConnector is not available
+ // java.net.MalformedURLException: Unsupported protocol: jmxmp
+ _log.info("JMX: Starting JMXConnector server");
+ startJMXConnectorServer(port);
+ }
+ }
+ else
+ {
+ startJMXConnectorServer(port);
+ }
+ }
+ catch (Exception ex)
+ {
+ _log.error("Error in initialising Managed Object Registry." + ex.getMessage());
+ ex.printStackTrace();
+ }
+ }
+
+ /**
+ * Starts up an RMIRegistry at configured port and attaches a JMXConnectorServer to it.
+ *
+ * @param port
+ *
+ * @throws IOException
+ */
+ private void startJMXConnectorServer(int port) throws IOException
+ {
+ startRMIRegistry(port);
+ _jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + port + "/jmxrmi");
+ JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(_jmxURL, null, _mbeanServer);
+ cs.start();
}
public void registerObject(ManagedObject managedObject) throws JMException
{
- _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
+ _mbeanServer.registerMBean(managedObject, managedObject.getObjectName());
}
public void unregisterObject(ManagedObject managedObject) throws JMException
@@ -50,4 +195,105 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
_mbeanServer.unregisterMBean(managedObject.getObjectName());
}
+ /**
+ * Checks is the "QPID_OPTS" env variable is set to use the out of the box JMXAgent.
+ *
+ * @return
+ */
+ private boolean areOutOfTheBoxJMXOptionsSet()
+ {
+ if (System.getProperty("com.sun.management.jmxremote") != null)
+ {
+ return true;
+ }
+
+ if (System.getProperty("com.sun.management.jmxremote.port") != null)
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Starts the rmi registry at given port
+ *
+ * @param port
+ *
+ * @throws RemoteException
+ */
+ private void startRMIRegistry(int port) throws RemoteException
+ {
+ System.setProperty("java.rmi.server.randomIDs", "true");
+ _rmiRegistry = LocateRegistry.createRegistry(port);
+ }
+
+ // stops the RMIRegistry, if it was running and bound to a port
+ public void close() throws RemoteException
+ {
+ if (_rmiRegistry != null)
+ {
+ // Stopping the RMI registry
+ UnicastRemoteObject.unexportObject(_rmiRegistry, true);
+ }
+ }
+
+ /** This class is used for SASL enabled JMXConnector for performing user authentication. */
+ private class UserCallbackHandler implements CallbackHandler
+ {
+ private final PrincipalDatabase _principalDatabase;
+
+ protected UserCallbackHandler(PrincipalDatabase database)
+ {
+ _principalDatabase = database;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ // Retrieve callbacks
+ NameCallback ncb = null;
+ PasswordCallback pcb = null;
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ if (callbacks[i] instanceof NameCallback)
+ {
+ ncb = (NameCallback) callbacks[i];
+ }
+ else if (callbacks[i] instanceof PasswordCallback)
+ {
+ pcb = (PasswordCallback) callbacks[i];
+ }
+ else if (callbacks[i] instanceof AuthorizeCallback)
+ {
+ ((AuthorizeCallback) callbacks[i]).setAuthorized(true);
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(callbacks[i]);
+ }
+ }
+
+ boolean authorized = false;
+ // Process retrieval of password; can get password if username is available in NameCallback
+ if ((ncb != null) && (pcb != null))
+ {
+ String username = ncb.getDefaultName();
+ try
+ {
+ authorized = _principalDatabase.verifyPassword(username, new String(pcb.getPassword()));
+ }
+ catch (AccountNotFoundException e)
+ {
+ IOException ioe = new IOException("User not authorized. " + e);
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+
+ if (!authorized)
+ {
+ throw new IOException("User not authorized.");
+ }
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
new file mode 100644
index 0000000000..a79d993afc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
@@ -0,0 +1,217 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.management;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+import javax.management.remote.MBeanServerForwarder;
+import javax.management.remote.JMXPrincipal;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.JMException;
+import javax.security.auth.Subject;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.security.AccessController;
+import java.security.Principal;
+import java.security.AccessControlContext;
+import java.util.Set;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.io.File;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+
+/**
+ * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. This implements
+ * the logic for allowing the users to invoke MBean operations and implements the restrictions for readOnly, readWrite
+ * and admin users.
+ */
+public class MBeanInvocationHandlerImpl implements InvocationHandler
+{
+ private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class);
+
+ public final static String ADMIN = "admin";
+ public final static String READWRITE = "readwrite";
+ public final static String READONLY = "readonly";
+ private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
+ private MBeanServer mbs;
+ private static Properties _userRoles = new Properties();
+
+ public static MBeanServerForwarder newProxyInstance()
+ {
+ final InvocationHandler handler = new MBeanInvocationHandlerImpl();
+ final Class[] interfaces = new Class[]{MBeanServerForwarder.class};
+
+ Object proxy = Proxy.newProxyInstance(MBeanServerForwarder.class.getClassLoader(), interfaces, handler);
+ return MBeanServerForwarder.class.cast(proxy);
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ final String methodName = method.getName();
+
+ if (methodName.equals("getMBeanServer"))
+ {
+ return mbs;
+ }
+
+ if (methodName.equals("setMBeanServer"))
+ {
+ if (args[0] == null)
+ {
+ throw new IllegalArgumentException("Null MBeanServer");
+ }
+ if (mbs != null)
+ {
+ throw new IllegalArgumentException("MBeanServer object already initialized");
+ }
+ mbs = (MBeanServer) args[0];
+ return null;
+ }
+
+ // Retrieve Subject from current AccessControlContext
+ AccessControlContext acc = AccessController.getContext();
+ Subject subject = Subject.getSubject(acc);
+
+ // Allow operations performed locally on behalf of the connector server itself
+ if (subject == null)
+ {
+ return method.invoke(mbs, args);
+ }
+
+ if (args == null || DELEGATE.equals(args[0]))
+ {
+ return method.invoke(mbs, args);
+ }
+
+ // Restrict access to "createMBean" and "unregisterMBean" to any user
+ if (methodName.equals("createMBean") || methodName.equals("unregisterMBean"))
+ {
+ throw new SecurityException("Access denied");
+ }
+
+ // Retrieve JMXPrincipal from Subject
+ Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
+ if (principals == null || principals.isEmpty())
+ {
+ throw new SecurityException("Access denied");
+ }
+
+ Principal principal = principals.iterator().next();
+ String identity = principal.getName();
+
+ // Following users can perform any operation other than "createMBean" and "unregisterMBean"
+ if (isAdmin(identity) || isAllowedToModify(identity))
+ {
+ return method.invoke(mbs, args);
+ }
+
+ // These users can only call "getAttribute" on the MBeanServerDelegate MBean
+ // Here we can add other fine grained permissions like specific method for a particular mbean
+ if (isReadOnlyUser(identity) && isReadOnlyMethod(method, args))
+ {
+ return method.invoke(mbs, args);
+ }
+
+ throw new SecurityException("Access denied");
+ }
+
+ // Initialises the user roles
+ public static void setAccessRights(Properties accessRights)
+ {
+ _userRoles = accessRights;
+ }
+
+ private boolean isAdmin(String userName)
+ {
+ if (ADMIN.equals(_userRoles.getProperty(userName)))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isAllowedToModify(String userName)
+ {
+ if (READWRITE.equals(_userRoles.getProperty(userName)))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isReadOnlyUser(String userName)
+ {
+ if (READONLY.equals(_userRoles.getProperty(userName)))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isReadOnlyMethod(Method method, Object[] args)
+ {
+ String methodName = method.getName();
+ if (methodName.equals("queryMBeans") ||
+ methodName.equals("getDefaultDomain") ||
+ methodName.equals("getMBeanInfo") ||
+ methodName.equals("getAttribute") ||
+ methodName.equals("getAttributes"))
+ {
+ return true;
+ }
+
+ if (args[0] instanceof ObjectName)
+ {
+ String mbeanMethod = (args.length > 1) ? (String) args[1] : null;
+ if (mbeanMethod == null)
+ {
+ return false;
+ }
+
+ try
+ {
+ MBeanInfo mbeanInfo = mbs.getMBeanInfo((ObjectName) args[0]);
+ if (mbeanInfo != null)
+ {
+ MBeanOperationInfo[] opInfos = mbeanInfo.getOperations();
+ for (MBeanOperationInfo opInfo : opInfos)
+ {
+ if (opInfo.getName().equals(mbeanMethod) && (opInfo.getImpact() == MBeanOperationInfo.INFO))
+ {
+ return true;
+ }
+ }
+ }
+ }
+ catch (JMException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
index b2f79b6410..45e2e91ed7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
@@ -52,8 +52,7 @@ public interface ManagedBroker
@MBeanOperation(name="createNewExchange", description="Creates a new Exchange", impact= MBeanOperationInfo.ACTION)
void createNewExchange(@MBeanOperationParameter(name="name", description="Name of the new exchange")String name,
@MBeanOperationParameter(name="ExchangeType", description="Type of the exchange")String type,
- @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable,
- @MBeanOperationParameter(name="passive", description="true of the Exchange should be passive")boolean passive)
+ @MBeanOperationParameter(name="durable", description="true if the Exchang should be durable")boolean durable)
throws IOException, JMException;
/**
@@ -81,8 +80,7 @@ public interface ManagedBroker
@MBeanOperation(name="createNewQueue", description="Create a new Queue on the Broker server", impact= MBeanOperationInfo.ACTION)
void createNewQueue(@MBeanOperationParameter(name="queue name", description="Name of the new queue")String queueName,
@MBeanOperationParameter(name="owner", description="Owner name")String owner,
- @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable,
- @MBeanOperationParameter(name="autoDelete", description="true if the queue should be auto delete") boolean autoDelete)
+ @MBeanOperationParameter(name="durable", description="true if the queue should be durable")boolean durable)
throws IOException, JMException;
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java
index 32298f05e3..5f9bc9ddad 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObjectRegistry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.management;
import javax.management.JMException;
+import java.rmi.RemoteException;
/**
* Handles the registration (and unregistration and so on) of managed objects.
@@ -36,7 +37,11 @@ import javax.management.JMException;
*/
public interface ManagedObjectRegistry
{
+ void start();
+
void registerObject(ManagedObject managedObject) throws JMException;
void unregisterObject(ManagedObject managedObject) throws JMException;
+
+ void close() throws RemoteException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
index 5b86543ea6..b4fbed6948 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/NoopManagedObjectRegistry.java
@@ -24,6 +24,8 @@ import javax.management.JMException;
import org.apache.log4j.Logger;
+import java.rmi.RemoteException;
+
/**
* This managed object registry does not actually register MBeans. This can be used in tests when management is
* not required or when management has been disabled.
@@ -38,6 +40,11 @@ public class NoopManagedObjectRegistry implements ManagedObjectRegistry
_log.info("Management is disabled");
}
+ public void start()
+ {
+ //no-op
+ }
+
public void registerObject(ManagedObject managedObject) throws JMException
{
}
@@ -45,4 +52,9 @@ public class NoopManagedObjectRegistry implements ManagedObjectRegistry
public void unregisterObject(ManagedObject managedObject) throws JMException
{
}
+
+ public void close() throws RemoteException
+ {
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index fd8fb2d5cb..2e62c2f1e4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.security.Principal;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -108,7 +109,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
private List<Integer> _closingChannelsList = new ArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
- private String _authorizedID;
+ private Principal _authorizedID;
public ManagedObject getManagedObject()
@@ -745,12 +746,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _protocolOutputConverter;
}
- public void setAuthorizedID(String authorizedID)
+ public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
}
- public String getAuthorizedID()
+ public Principal getAuthorizedID()
{
return _authorizedID;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 79421dd497..390117acf6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -31,6 +31,8 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.security.Principal;
+
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
@@ -165,9 +167,9 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
public ProtocolOutputConverter getProtocolOutputConverter();
- void setAuthorizedID(String authorizedID);
+ void setAuthorizedID(Principal authorizedID);
- /** @return a username string that was used to authorized this session */
- String getAuthorizedID();
+ /** @return a Principal that was used to authorized this session */
+ Principal getAuthorizedID();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 5eebd4c524..66f928a70e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,14 +37,15 @@
*/
package org.apache.qpid.server.protocol;
+import java.security.Principal;
import java.util.Date;
import java.util.List;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
import javax.management.NotCompliantMBeanException;
+import javax.management.Notification;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -56,15 +77,17 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
private AMQMinaProtocolSession _session = null;
private String _name = null;
-
- //openmbean data types for representing the channel attributes
- private final static String[] _channelAtttibuteNames = {"Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
- private final static String[] _indexNames = {_channelAtttibuteNames[0]};
- private final static OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
- private static CompositeType _channelType = null; // represents the data type for channel data
- private static TabularType _channelsType = null; // Data type for list of channels type
+
+ // openmbean data types for representing the channel attributes
+ private static final String[] _channelAtttibuteNames =
+ { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count" };
+ private static final String[] _indexNames = { _channelAtttibuteNames[0] };
+ private static final OpenType[] _channelAttributeTypes =
+ { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER };
+ private static CompositeType _channelType = null; // represents the data type for channel data
+ private static TabularType _channelsType = null; // Data type for list of channels type
private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
- new AMQShortString("Broker Management Console has closed the connection.");
+ new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
@@ -72,22 +95,21 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
super(ManagedConnection.class, ManagedConnection.TYPE);
_session = session;
String remote = getRemoteAddress();
- remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
+ remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
init();
}
-
static
{
try
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
- // It should never occur
- System.out.println(ex.getMessage());
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static initializer.", ex);
}
}
@@ -96,26 +118,27 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
*/
private static void init() throws OpenDataException
{
- _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
- _channelAtttibuteNames, _channelAttributeTypes);
+ _channelType =
+ new CompositeType("Channel", "Channel Details", _channelAtttibuteNames, _channelAtttibuteNames,
+ _channelAttributeTypes);
_channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
}
public String getClientId()
{
- return _session.getContextKey() == null ? null : _session.getContextKey().toString();
+ return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
}
public String getAuthorizedId()
{
- return _session.getAuthorizedID();
+ return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null;
}
public String getVersion()
{
- return _session.getClientVersion() == null ? null : _session.getClientVersion().toString();
+ return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
}
-
+
public Date getLastIoTime()
{
return new Date(_session.getIOSession().getLastIoTime());
@@ -171,6 +194,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
+
_session.commitTransactions(channel);
}
catch (AMQException ex)
@@ -194,6 +218,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
+
_session.rollbackTransactions(channel);
}
catch (AMQException ex)
@@ -215,9 +240,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
for (AMQChannel channel : list)
{
- Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
+ Object[] itemValues =
+ {
+ channel.getChannelId(), channel.isTransactional(),
(channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null,
- channel.getUnacknowledgedMessageMap().size()};
+ channel.getUnacknowledgedMessageMap().size()
+ };
CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
channelsList.put(channelData);
@@ -232,17 +260,16 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
* @throws JMException
*/
public void closeConnection() throws JMException
- {
+ {
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
- _session.getProtocolMajorVersion(),
- _session.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
+ final AMQFrame response =
+ ConnectionCloseBody.createAMQFrame(0, _session.getProtocolMajorVersion(), _session.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
);
_session.writeFrame(response);
@@ -259,18 +286,19 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
String name = MonitorNotification.class.getName();
String description = "Channel count has reached threshold value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
- return new MBeanNotificationInfo[]{info1};
+ return new MBeanNotificationInfo[] { info1 };
}
public void notifyClients(String notificationMsg)
{
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ Notification n =
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
index 990c4c0794..e6e713ac6d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol;
import java.io.IOException;
import java.util.Date;
+import java.security.Principal;
import javax.management.JMException;
import javax.management.MBeanOperationInfo;
@@ -67,16 +68,17 @@ public interface ManagedConnection
/**
* Tells the total number of bytes written till now.
* @return number of bytes written.
- */
+ *
@MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
Long getWrittenBytes();
-
+ */
/**
* Tells the total number of bytes read till now.
* @return number of bytes read.
- */
+ *
@MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
Long getReadBytes();
+ */
/**
* Threshold high value for no of channels. This is useful in setting notifications or
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d6962d28cd..b2046efee3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -42,6 +43,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -78,19 +81,20 @@ public class AMQMessage
private boolean _immediate;
private AtomicBoolean _taken = new AtomicBoolean(false);
-
private TransientMessageData _transientMessageData = new TransientMessageData();
private Subscription _takenBySubcription;
-
private Set<Subscription> _rejectedBy = null;
+ private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+ private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken()
+ public boolean isTaken(AMQQueue queue)
{
return _taken.get();
}
private final int hashcode = System.identityHashCode(this);
+
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
@@ -203,9 +207,10 @@ public class AMQMessage
_transientMessageData.setMessagePublishInfo(info);
_taken = new AtomicBoolean(false);
+
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")");
}
}
@@ -318,8 +323,10 @@ public class AMQMessage
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
+
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
+ _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -356,12 +363,13 @@ public class AMQMessage
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
*/
public AMQMessage takeReference()
{
_referenceCount.incrementAndGet();
- return this;
+ return this;
}
/** Threadsafe. Increment the reference count on the message. */
@@ -378,9 +386,10 @@ public class AMQMessage
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
+ * @param storeContext
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
- * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -451,7 +460,7 @@ public class AMQMessage
}
- public boolean taken(Subscription sub)
+ public boolean taken(AMQQueue queue, Subscription sub)
{
if (_taken.getAndSet(true))
{
@@ -464,7 +473,7 @@ public class AMQMessage
}
}
- public void release()
+ public void release(AMQQueue queue)
{
if (_log.isTraceEnabled())
{
@@ -600,7 +609,7 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
//Increment the references to this message for each queue delivery.
- incrementReference();
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -824,11 +833,14 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken + " by:" + _takenBySubcription;
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ _taken + " by :" + _takenBySubcription;
+
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
- public Subscription getDeliveredSubscription()
+ public Subscription getDeliveredSubscription(AMQQueue queue)
{
return _takenBySubcription;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 7a32848c44..bbaa7379f6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -1,5 +1,25 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/*
+ *
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,11 +37,11 @@
*/
package org.apache.qpid.server.queue;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
-import java.util.Date;
-import java.text.SimpleDateFormat;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -41,12 +61,14 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -73,15 +95,15 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
- private final static String[] _msgAttributeNames = {"AMQ MessageId", "Header", "Size(bytes)", "Redelivered"};
- private static String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+ private static final String[] _msgAttributeNames = { "AMQ MessageId", "Header", "Size(bytes)", "Redelivered" };
+ private static String[] _msgAttributeIndex = { _msgAttributeNames[0] };
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
- private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
- private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
+ private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
+ private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
- private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
+ private static final String[] _msgContentAttributes = { "AMQ MessageId", "MimeType", "Encoding", "Content" };
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -95,7 +117,6 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
_queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
}
-
public ManagedObject getParentObject()
{
return _queue.getVirtualHost().getManagedObject();
@@ -107,10 +128,10 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
- // It should never occur
- System.out.println(ex.getMessage());
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static initializer.", ex);
}
}
@@ -119,19 +140,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
*/
private static void init() throws OpenDataException
{
- _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
- _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
- _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
-
- _msgAttributeTypes[0] = SimpleType.LONG; // For message id
- _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
- _msgAttributeTypes[2] = SimpleType.LONG; // For size
- _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
-
- _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _msgContentAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
+ _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
+ _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
+ _msgContentType =
+ new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, _msgContentAttributes,
+ _msgContentAttributeTypes);
+
+ _msgAttributeTypes[0] = SimpleType.LONG; // For message id
+ _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
+ _msgAttributeTypes[2] = SimpleType.LONG; // For size
+ _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
+
+ _messageDataType =
+ new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
@@ -213,7 +236,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public Long getMaximumQueueDepth()
{
long queueDepthInBytes = _queue.getMaximumQueueDepth();
- return queueDepthInBytes >> 10 ;
+
+ return queueDepthInBytes >> 10;
}
public void setMaximumQueueDepth(Long value)
@@ -227,7 +251,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public Long getQueueDepth() throws JMException
{
long queueBytesSize = _queue.getQueueDepth();
- return queueBytesSize >> 10 ;
+
+ return queueBytesSize >> 10;
}
/**
@@ -237,13 +262,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- for(NotificationCheck check : NotificationCheck.values())
+ for (NotificationCheck check : NotificationCheck.values())
{
- if(check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()]<thresholdTime)
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- if(check.notifyIfNecessary(msg, _queue, this))
+ if (check.notifyIfNecessary(msg, _queue, this))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
@@ -260,9 +285,10 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
notificationMsg = notification.name() + " " + notificationMsg;
-
- _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+
+ _lastNotification =
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(_lastNotification);
}
@@ -334,20 +360,25 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
try
{
// Create header attributes list
- CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = null, encoding = null;
if (headerProperties != null)
{
AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = mimeTypeShortSting == null ? null : mimeTypeShortSting.toString();
- encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding().toString();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
}
- Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
+
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
}
catch (AMQException e)
{
- throw new JMException("Error creating header attributes list: " + e);
+ JMException jme = new JMException("Error creating header attributes list: " + e);
+ jme.initCause(e);
+ throw jme;
}
}
@@ -358,8 +389,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
if ((beginIndex > endIndex) || (beginIndex < 1))
{
- throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex +
- "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
+ throw new OperationsException("From Index = " + beginIndex + ", To Index = " + endIndex
+ + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
List<AMQMessage> list = _queue.getMessagesOnTheQueue();
@@ -368,20 +399,22 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
try
{
// Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
AMQMessage msg = list.get(i - 1);
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() };
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
}
}
catch (AMQException e)
{
- throw new JMException("Error creating message contents: " + e);
+ JMException jme = new JMException("Error creating message contents: " + e);
+ jme.initCause(e);
+ throw jme;
}
return _messageList;
@@ -400,11 +433,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
int delMode = headerProperties.getDeliveryMode();
- list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent"));
+ list.add("JMSDeliveryMode = " + ((delMode == 1) ? "Persistent" : "Non_Persistent"));
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
-
+
long longDate = headerProperties.getExpiration();
String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
list.add("JMSExpiration = " + strDate);
@@ -425,27 +458,26 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
*/
public void moveMessages(long fromMessageId, long toMessageId, String toQueueName) throws JMException
{
- if (fromMessageId > toMessageId || (fromMessageId < 1))
+ if ((fromMessageId > toMessageId) || (fromMessageId < 1))
{
- throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
+ throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
}
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
}
-
/**
* returns Notifications sent by this MBean.
*/
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
String name = MonitorNotification.class.getName();
String description = "Either Message count or Queue depth or Message size has reached threshold high value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
- return new MBeanNotificationInfo[]{info1};
+ return new MBeanNotificationInfo[] { info1 };
}
} // End of AMQQueueMBean class
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index cfa13c87fd..979f692361 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -210,6 +210,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Returns all the messages in the Queue
+ *
* @return List of messages
*/
public List<AMQMessage> getMessages()
@@ -222,14 +223,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
list.add(message);
}
_lock.unlock();
-
+
return list;
}
/**
* Returns messages within the range of given messageIds
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return
*/
public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
@@ -242,7 +245,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
long maxMessageCount = toMessageId - fromMessageId + 1;
_lock.lock();
-
+
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
for (AMQMessage message : _messages)
@@ -399,7 +402,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
_lock.lock();
-
+
AMQMessage message = _messages.poll();
if (message != null)
{
@@ -432,9 +435,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
- /**
- This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
- */
+ /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -444,8 +445,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
- //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
- while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
+ //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
+ while (message != null
+ && (
+ ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+ || sub == null)
+ && message.taken(_queue, sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
@@ -506,7 +511,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -526,7 +531,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -562,7 +567,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
catch (AMQException e)
{
- message.release();
+ message.release(_queue);
_log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -723,7 +728,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(s);
+ msg.taken(_queue, s);
//Deliver the message
s.send(msg, _queue);
}
@@ -737,7 +742,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- if (!msg.isTaken())
+ if (!msg.isTaken(_queue))
{
if (_log.isInfoEnabled())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index d3578d39e8..e3944954f3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -558,7 +558,7 @@ public class SubscriptionImpl implements Subscription
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release();
+ resent.release(_queue);
_queue.subscriberHasPendingResend(false, this, resent);
try
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 14a8063aee..89f0b7b39d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -153,7 +153,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_logger.error("Error configuring application: " + e, e);
//throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
- throw new RuntimeException("Unable to create Application Registry");
+ throw new RuntimeException("Unable to create Application Registry", e);
}
}
else
@@ -168,6 +168,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
virtualHost.close();
}
+
+ // close the rmi registry(if any) started for management
+ if (getInstance().getManagedObjectRegistry() != null)
+ {
+ getInstance().getManagedObjectRegistry().close();
+ }
}
public Configuration getConfiguration()
@@ -187,7 +193,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
catch (Exception e)
{
_logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
}
Configurator.configure(instance);
_configuredObjects.put(instanceType, instance);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 739ed9db42..1cca259a8d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.security.access.AccessManager;
import org.apache.qpid.server.security.access.AccessManagerImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.AMQException;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
@@ -103,6 +104,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
initialiseManagedObjectRegistry();
+
_virtualHostRegistry = new VirtualHostRegistry();
_accessManager = new AccessManagerImpl("default", _configuration);
@@ -111,7 +113,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+ _databaseManager.initialiseManagement(_configuration);
+
+ _managedObjectRegistry.start();
+
initialiseVirtualHosts();
+
}
private void initialiseVirtualHosts() throws Exception
@@ -123,7 +130,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
}
}
- private void initialiseManagedObjectRegistry()
+ private void initialiseManagedObjectRegistry() throws AMQException
{
ManagementConfiguration config = getConfiguredObject(ManagementConfiguration.class);
if (config.enabled)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java
new file mode 100644
index 0000000000..f9e093dba7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security;
+
+import org.apache.commons.codec.binary.Base64;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.DigestException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+
+public class Passwd
+{
+ public static void main(String args[]) throws NoSuchAlgorithmException, DigestException, IOException
+ {
+ if (args.length != 2)
+ {
+ System.out.println("Passwd <username> <password>");
+ System.exit(0);
+ }
+
+ byte[] data = args[1].getBytes("utf-8");
+
+ MessageDigest md = MessageDigest.getInstance("MD5");
+
+ for (byte b : data)
+ {
+ md.update(b);
+ }
+
+ byte[] digest = md.digest();
+
+ Base64 b64 = new Base64();
+
+ byte[] encoded = b64.encode(digest);
+
+ output(args[0], encoded);
+ }
+
+ private static void output(String user, byte[] encoded) throws IOException
+ {
+
+// File passwdFile = new File("qpid.passwd");
+
+ PrintStream ps = new PrintStream(System.out);
+
+ user += ":";
+ ps.write(user.getBytes("utf-8"));
+
+ for (byte b : encoded)
+ {
+ ps.write(b);
+ }
+
+ ps.println();
+
+ ps.flush();
+ ps.close();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
new file mode 100644
index 0000000000..a43474559d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.log4j.Logger;
+import org.apache.commons.configuration.ConfigurationException;
+
+import javax.management.JMException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.security.auth.login.AccountNotFoundException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.FileOutputStream;
+import java.util.Properties;
+import java.util.List;
+import java.util.Enumeration;
+import java.util.concurrent.locks.ReentrantLock;
+import java.security.Principal;
+
+/** MBean class for AMQUserManagementMBean. It implements all the management features exposed for managing users. */
+@MBeanDescription("User Management Interface")
+public class AMQUserManagementMBean extends AMQManagedObject implements UserManagement
+{
+
+ private static final Logger _logger = Logger.getLogger(AMQUserManagementMBean.class);
+
+ private PrincipalDatabase _principalDatabase;
+ private String _accessFileName;
+ private Properties _accessRights;
+ // private File _accessFile;
+ private ReentrantLock _accessRightsUpdate = new ReentrantLock();
+
+ // Setup for the TabularType
+ static TabularType _userlistDataType; // Datatype for representing User Lists
+
+ static CompositeType _userDataType; // Composite type for representing User
+ static String[] _userItemNames = {"Username", "Read", "Write", "Admin"};
+
+ static
+ {
+ String[] userItemDesc = {"Broker Login username", "Management Console Read Permission",
+ "Management Console Write Permission", "Management Console Admin Permission"};
+
+ OpenType[] userItemTypes = new OpenType[4]; // User item types.
+ userItemTypes[0] = SimpleType.STRING; // For Username
+ userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read
+ userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write
+ userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin
+ String[] userDataIndex = {_userItemNames[0]};
+
+ try
+ {
+ _userDataType =
+ new CompositeType("User", "User Data", _userItemNames, userItemDesc, userItemTypes);
+
+ _userlistDataType = new TabularType("Users", "List of users", _userDataType, userDataIndex);
+ }
+ catch (OpenDataException e)
+ {
+ _logger.error("Tabular data setup for viewing users incorrect.");
+ _userlistDataType = null;
+ }
+ }
+
+
+ public AMQUserManagementMBean() throws JMException
+ {
+ super(UserManagement.class, UserManagement.TYPE);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return UserManagement.TYPE;
+ }
+
+ public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
+ @MBeanOperationParameter(name = "password", description = "Password")String password)
+ {
+ try
+ {
+ //delegate password changes to the Principal Database
+ return _principalDatabase.updatePassword(new UsernamePrincipal(username), password);
+ }
+ catch (AccountNotFoundException e)
+ {
+ _logger.warn("Attempt to set password of non-existant user'" + username + "'");
+ return false;
+ }
+ }
+
+ public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username,
+ @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
+ @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
+ @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ {
+
+ if (_accessRights.get(username) == null)
+ {
+ // If the user doesn't exist in the user rights file check that they at least have an account.
+ if (_principalDatabase.getUser(username) == null)
+ {
+ return false;
+ }
+ }
+
+ try
+ {
+
+ _accessRightsUpdate.lock();
+
+ // Update the access rights
+ if (admin)
+ {
+ _accessRights.put(username, MBeanInvocationHandlerImpl.ADMIN);
+ }
+ else
+ {
+ if (read | write)
+ {
+ if (read)
+ {
+ _accessRights.put(username, MBeanInvocationHandlerImpl.READONLY);
+ }
+ if (write)
+ {
+ _accessRights.put(username, MBeanInvocationHandlerImpl.READWRITE);
+ }
+ }
+ else
+ {
+ _accessRights.remove(username);
+ }
+ }
+
+ saveAccessFile();
+ }
+ finally
+ {
+ if (_accessRightsUpdate.isHeldByCurrentThread())
+ {
+ _accessRightsUpdate.unlock();
+ }
+ }
+
+ return true;
+ }
+
+ public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
+ @MBeanOperationParameter(name = "password", description = "Password")String password,
+ @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
+ @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
+ @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ {
+ if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password))
+ {
+ _accessRights.put(username, "");
+
+ return setRights(username, read, write, admin);
+ }
+
+ return false;
+ }
+
+ public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username)
+ {
+
+ try
+ {
+ if (_principalDatabase.deletePrincipal(new UsernamePrincipal(username)))
+ {
+ try
+ {
+ _accessRightsUpdate.lock();
+
+ _accessRights.remove(username);
+ saveAccessFile();
+ }
+ finally
+ {
+ if (_accessRightsUpdate.isHeldByCurrentThread())
+ {
+ _accessRightsUpdate.unlock();
+ }
+ }
+ return true;
+ }
+ }
+ catch (AccountNotFoundException e)
+ {
+ _logger.warn("Attempt to delete user (" + username + ") that doesn't exist");
+ }
+
+ return false;
+ }
+
+ public boolean reloadData()
+ {
+ try
+ {
+ try
+ {
+ loadAccessFile();
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.info("Reload failed due to:" + e);
+ return false;
+ }
+
+ // Reload successful
+ return true;
+ }
+ catch (IOException e)
+ {
+ _logger.info("Reload failed due to:" + e);
+ // Reload unsuccessful
+ return false;
+ }
+ }
+
+
+ @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.")
+ public TabularData viewUsers()
+ {
+ // Table of users
+ // Username(string), Access rights Read,Write,Admin(bool,bool,bool)
+
+ reloadData();
+
+ if (_userlistDataType == null)
+ {
+ _logger.warn("TabluarData not setup correctly");
+ return null;
+ }
+
+ List<Principal> users = _principalDatabase.getUsers();
+
+ TabularDataSupport userList = new TabularDataSupport(_userlistDataType);
+
+ try
+ {
+ // Create the tabular list of message header contents
+ for (Principal user : users)
+ {
+ // Create header attributes list
+
+ String rights = (String) _accessRights.get(user.getName());
+
+ Boolean read = false;
+ Boolean write = false;
+ Boolean admin = false;
+
+ if (rights != null)
+ {
+ read = rights.equals(MBeanInvocationHandlerImpl.READONLY)
+ || rights.equals(MBeanInvocationHandlerImpl.READWRITE);
+ write = rights.equals(MBeanInvocationHandlerImpl.READWRITE);
+ admin = rights.equals(MBeanInvocationHandlerImpl.ADMIN);
+ }
+
+ Object[] itemData = {user.getName(), read, write, admin};
+ CompositeData messageData = new CompositeDataSupport(_userDataType, _userItemNames, itemData);
+ userList.put(messageData);
+ }
+ }
+ catch (OpenDataException e)
+ {
+ _logger.warn("Unable to create user list due to :" + e);
+ return null;
+ }
+
+ return userList;
+ }
+
+ /*** Broker Methods **/
+
+ /**
+ * setPrincipalDatabase
+ *
+ * @param database set The Database to use for user lookup
+ */
+ public void setPrincipalDatabase(PrincipalDatabase database)
+ {
+ _principalDatabase = database;
+ }
+
+ /**
+ * setAccessFile
+ *
+ * @param accessFile the file to use for updating.
+ *
+ * @throws java.io.IOException If the file cannot be accessed
+ * @throws org.apache.commons.configuration.ConfigurationException
+ * if checks on the file fail.
+ */
+ public void setAccessFile(String accessFile) throws IOException, ConfigurationException
+ {
+ _accessFileName = accessFile;
+
+ if (_accessFileName != null)
+ {
+ loadAccessFile();
+ }
+ else
+ {
+ _logger.warn("Access rights file specified is null. Access rights not changed.");
+ }
+ }
+
+ private void loadAccessFile() throws IOException, ConfigurationException
+ {
+ try
+ {
+ _accessRightsUpdate.lock();
+
+ Properties accessRights = new Properties();
+
+ File accessFile = new File(_accessFileName);
+
+ if (!accessFile.exists())
+ {
+ throw new ConfigurationException("'" + _accessFileName + "' does not exist");
+ }
+
+ if (!accessFile.canRead())
+ {
+ throw new ConfigurationException("Cannot read '" + _accessFileName + "'.");
+ }
+
+ if (!accessFile.canWrite())
+ {
+ _logger.warn("Unable to write to access file '" + _accessFileName + "' changes will not be preserved.");
+ }
+
+ accessRights.load(new FileInputStream(accessFile));
+ checkAccessRights(accessRights);
+ setAccessRights(accessRights);
+ }
+ finally
+ {
+ if (_accessRightsUpdate.isHeldByCurrentThread())
+ {
+ _accessRightsUpdate.unlock();
+ }
+ }
+ }
+
+ private void checkAccessRights(Properties accessRights)
+ {
+ Enumeration values = accessRights.propertyNames();
+
+ while (values.hasMoreElements())
+ {
+ String user = (String) values.nextElement();
+
+ if (_principalDatabase.getUser(user) == null)
+ {
+ _logger.warn("Access rights contains user '" + user + "' but there is no authentication data for that user");
+ }
+ }
+ }
+
+ private void saveAccessFile()
+ {
+ try
+ {
+ _accessRightsUpdate.lock();
+ try
+ {
+ // remove old temporary file
+ File tmp = new File(_accessFileName + ".tmp");
+ if (tmp.exists())
+ {
+ tmp.delete();
+ }
+
+ //remove old backup
+ File old = new File(_accessFileName + ".old");
+ if (old.exists())
+ {
+ old.delete();
+ }
+
+ // Rename current file
+ File rights = new File(_accessFileName);
+ rights.renameTo(old);
+
+ FileOutputStream output = new FileOutputStream(tmp);
+ _accessRights.store(output, "");
+ output.close();
+
+ // Rename new file to main file
+ tmp.renameTo(rights);
+
+ // delete tmp
+ tmp.delete();
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Problem occured saving '" + _accessFileName + "' changes may not be preserved. :" + e);
+ }
+ }
+ finally
+ {
+ if (_accessRightsUpdate.isHeldByCurrentThread())
+ {
+ _accessRightsUpdate.unlock();
+ }
+ }
+ }
+
+ /**
+ * user=read user=write user=readwrite user=admin
+ *
+ * @param accessRights The properties list of access rights to process
+ */
+ private void setAccessRights(Properties accessRights)
+ {
+ _logger.debug("Setting Access Rights:" + accessRights);
+ _accessRights = accessRights;
+ MBeanInvocationHandlerImpl.setAccessRights(_accessRights);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java
index 0c0de88182..d70a6dc8f4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java
@@ -20,8 +20,13 @@
*/
package org.apache.qpid.server.security.access;
+import java.security.Principal;
+
public interface AccessManager
{
+ AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights);
+
+ @Deprecated
AccessResult isAuthorized(Accessable accessObject, String username);
String getName();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
index 0feb2791da..35d036d20f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java
@@ -23,13 +23,13 @@ package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.configuration.PropertyUtils;
-import org.apache.qpid.configuration.PropertyException;
import org.apache.log4j.Logger;
import java.util.List;
import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
+import java.security.Principal;
public class AccessManagerImpl implements AccessManager
{
@@ -39,8 +39,13 @@ public class AccessManagerImpl implements AccessManager
public AccessManagerImpl(String name, Configuration hostConfig) throws ConfigurationException
{
- String accessClass = hostConfig.getString("security.access.class");
+ if (hostConfig == null)
+ {
+ _logger.warn("No Configuration specified. Using default access controls for VirtualHost:'" + name + "'");
+ return;
+ }
+ String accessClass = hostConfig.getString("security.access.class");
if (accessClass == null)
{
_logger.warn("No access control specified. Using default access controls for VirtualHost:'" + name + "'");
@@ -111,21 +116,35 @@ public class AccessManagerImpl implements AccessManager
}
catch (Exception e)
{
- throw new ConfigurationException(e.getCause());
+ ConfigurationException ce = new ConfigurationException(e.getMessage(), e.getCause());
+ ce.initCause(e);
+ throw ce;
}
}
}
-
public AccessResult isAuthorized(Accessable accessObject, String username)
{
+ return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
+ }
+
+ public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights)
+ {
if (_accessManager == null)
{
- return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, username);
+ if (ApplicationRegistry.getInstance().getAccessManager() == this)
+ {
+ _logger.warn("No Default access manager specified DENYING ALL ACCESS");
+ return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+ }
+ else
+ {
+ return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, user, rights);
+ }
}
else
{
- return _accessManager.isAuthorized(accessObject, username);
+ return _accessManager.isAuthorized(accessObject, user, rights);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java
new file mode 100644
index 0000000000..1b79a5a0e0
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessRights.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+public class AccessRights
+{
+ public enum Rights
+ {
+ ANY,
+ READ,
+ WRITE,
+ READWRITE
+ }
+
+ Rights _right;
+
+ public AccessRights(Rights right)
+ {
+ _right = right;
+ }
+
+ public boolean allows(Rights rights)
+ {
+ switch (_right)
+ {
+ case ANY:
+ return (rights.equals(Rights.WRITE)
+ || rights.equals(Rights.READ)
+ || rights.equals(Rights.READWRITE)
+ || rights.equals(Rights.ANY));
+ case READ:
+ return rights.equals(Rights.READ) || rights.equals(Rights.ANY);
+ case WRITE:
+ return rights.equals(Rights.WRITE) || rights.equals(Rights.ANY);
+ case READWRITE:
+ return true;
+ }
+ return false;
+ }
+
+ public Rights getRights()
+ {
+ return _right;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java
index b2e4094edd..1ddca3a64e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java
@@ -20,9 +20,16 @@
*/
package org.apache.qpid.server.security.access;
+import java.security.Principal;
+
public class AllowAll implements AccessManager
{
+ public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights)
+ {
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+
public AccessResult isAuthorized(Accessable accessObject, String username)
{
return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java
index 0e62d2657f..bf40eeba4e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java
@@ -20,8 +20,15 @@
*/
package org.apache.qpid.server.security.access;
+import java.security.Principal;
+
public class DenyAll implements AccessManager
{
+ public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights)
+ {
+ return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+ }
+
public AccessResult isAuthorized(Accessable accessObject, String username)
{
return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java
new file mode 100644
index 0000000000..291bc714ed
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.FileNotFoundException;
+import java.io.File;
+import java.util.regex.Pattern;
+import java.security.Principal;
+
+/**
+ * Represents a user database where the account information is stored in a simple flat file.
+ *
+ * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn
+ *
+ * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text.
+ */
+public class FileAccessManager implements AccessManager
+{
+ private static final Logger _logger = Logger.getLogger(FileAccessManager.class);
+
+ protected File _accessFile;
+
+ protected Pattern _regexp = Pattern.compile(":");
+
+ private static final short USER_INDEX = 0;
+ private static final short VIRTUALHOST_INDEX = 1;
+
+ public void setAccessFile(String accessFile) throws FileNotFoundException
+ {
+ File f = new File(accessFile);
+ _logger.info("FileAccessManager using file " + f.getAbsolutePath());
+ _accessFile = f;
+ if (!f.exists())
+ {
+ throw new FileNotFoundException("Cannot find access file " + f);
+ }
+ if (!f.canRead())
+ {
+ throw new FileNotFoundException("Cannot read access file " + f +
+ ". Check permissions.");
+ }
+ }
+
+ /**
+ * Looks up the virtual hosts for a specified user in the access file.
+ *
+ * @param user The user to lookup
+ *
+ * @return a list of virtualhosts
+ */
+ private VirtualHostAccess[] lookupVirtualHost(String user)
+ {
+ String[] results = lookup(user, VIRTUALHOST_INDEX);
+ VirtualHostAccess vhosts[] = new VirtualHostAccess[results.length];
+
+ for (int index = 0; index < results.length; index++)
+ {
+ vhosts[index] = new VirtualHostAccess(results[index]);
+ }
+
+ return vhosts;
+ }
+
+
+ private String[] lookup(String user, int index)
+ {
+ try
+ {
+ BufferedReader reader = null;
+ try
+ {
+ reader = new BufferedReader(new FileReader(_accessFile));
+ String line;
+
+ while ((line = reader.readLine()) != null)
+ {
+ String[] result = _regexp.split(line);
+ if (result == null || result.length < (index + 1))
+ {
+ continue;
+ }
+
+ if (user.equals(result[USER_INDEX]))
+ {
+ return result[index].split(",");
+ }
+ }
+ return null;
+ }
+ finally
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ }
+ }
+ catch (IOException ioe)
+ {
+ //ignore
+ }
+ return null;
+ }
+
+ public AccessResult isAuthorized(Accessable accessObject, String username)
+ {
+ return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
+ }
+
+ public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights)
+ {
+ if (accessObject instanceof VirtualHost)
+ {
+ VirtualHostAccess[] hosts = lookupVirtualHost(user.getName());
+
+ if (hosts != null)
+ {
+ for (VirtualHostAccess host : hosts)
+ {
+ if (accessObject.getAccessableName().equals(host.getVirtualHost()))
+ {
+ if (host.getAccessRights().allows(rights))
+ {
+ return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+ }
+ else
+ {
+ return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+ }
+ }
+ }
+ }
+ }
+// else if (accessObject instanceof AMQQueue)
+// {
+// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost());
+//
+// if (queues != null)
+// {
+// for (String queue : queues)
+// {
+// if (accessObject.getAccessableName().equals(queue))
+// {
+// return new AccessResult(this, AccessResult.AccessStatus.GRANTED);
+// }
+// }
+// }
+// }
+
+ return new AccessResult(this, AccessResult.AccessStatus.REFUSED);
+ }
+
+ public String getName()
+ {
+ return "FileAccessManager";
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java
index 0e447b5744..6ccadb2e7d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java
@@ -22,8 +22,11 @@ package org.apache.qpid.server.security.access;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.log4j.Logger;
+import java.security.Principal;
+
public class PrincipalDatabaseAccessManager implements AccessManager
{
private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAccessManager.class);
@@ -58,15 +61,21 @@ public class PrincipalDatabaseAccessManager implements AccessManager
}
}
+
public AccessResult isAuthorized(Accessable accessObject, String username)
{
+ return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
+ }
+
+ public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights)
+ {
AccessResult result;
if (_database == null)
{
if (_default != null)
{
- result = _default.isAuthorized(accessObject, username);
+ result = _default.isAuthorized(accessObject, username, rights);
}
else
{
@@ -75,7 +84,15 @@ public class PrincipalDatabaseAccessManager implements AccessManager
}
else
{
- result = ((AccessManager) _database).isAuthorized(accessObject, username);
+ if (!(_database instanceof AccessManager))
+ {
+ _logger.warn("Specified PrincipalDatabase is not an AccessManager so using default AccessManager");
+ result = _default.isAuthorized(accessObject, username, rights);
+ }
+ else
+ {
+ result = ((AccessManager) _database).isAuthorized(accessObject, username, rights);
+ }
}
result.addAuthorizer(this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
new file mode 100644
index 0000000000..6381213398
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+import org.apache.qpid.server.management.MBeanOperation;
+import org.apache.qpid.server.management.MBeanOperationParameter;
+import org.apache.qpid.server.management.MBeanAttribute;
+import org.apache.qpid.AMQException;
+
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.CompositeData;
+import javax.management.JMException;
+import java.io.IOException;
+
+public interface UserManagement
+{
+ String TYPE = "UserManagement";
+
+ //********** Operations *****************//
+ /**
+ * set password for user
+ *
+ * @param username The username to create
+ * @param password The password for the user
+ *
+ * @return The result of the operation
+ */
+ @MBeanOperation(name = "setPassword", description = "Set password for user.")
+ boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
+ @MBeanOperationParameter(name = "password", description = "Password")String password);
+
+ /**
+ * set rights for users with given details
+ *
+ * @param username The username to create
+ * @param read The set of permission to give the new user
+ * @param write The set of permission to give the new user
+ * @param admin The set of permission to give the new user
+ *
+ * @return The result of the operation
+ */
+ @MBeanOperation(name = "setRights", description = "Set access rights for user.")
+ boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username,
+ @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
+ @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
+ @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin);
+
+ /**
+ * Create users with given details
+ *
+ * @param username The username to create
+ * @param password The password for the user
+ * @param read The set of permission to give the new user
+ * @param write The set of permission to give the new user
+ * @param admin The set of permission to give the new user
+ *
+ * @return The result of the operation
+ */
+ @MBeanOperation(name = "createUser", description = "Create new user from system.")
+ boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
+ @MBeanOperationParameter(name = "password", description = "Password")String password,
+ @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
+ @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
+ @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin);
+
+ /**
+ * View users returns all the users that are currently available to the system.
+ *
+ * @param username The user to delete
+ *
+ * @return The result of the operation
+ */
+ @MBeanOperation(name = "deleteUser", description = "Delete user from system.")
+ boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username);
+
+
+ /**
+ * Reload the date from disk
+ *
+ * @return The result of the operation
+ */
+// @MBeanOperation(name = "reloadData", description = "Reload the authentication file from disk.")
+// boolean reloadData();
+
+ /**
+ * View users returns all the users that are currently available to the system.
+ *
+ * @return a table of users data (Username, read, write, admin)
+ */
+ @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.")
+ TabularData viewUsers();
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java
new file mode 100644
index 0000000000..13151a66b8
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/VirtualHostAccess.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.access;
+
+public class VirtualHostAccess
+{
+ private String _vhost;
+ private AccessRights _rights;
+
+ public VirtualHostAccess(String vhostaccess)
+ {
+ //format <vhost>(<rights>)
+ int hostend = vhostaccess.indexOf('(');
+
+ if (hostend == -1)
+ {
+ throw new IllegalArgumentException("VirtualHostAccess format string contains no access _rights");
+ }
+
+ _vhost = vhostaccess.substring(0, hostend);
+
+ String rights = vhostaccess.substring(hostend);
+
+ if (rights.indexOf('r') != -1)
+ {
+ if (rights.indexOf('w') != -1)
+ {
+ _rights = new AccessRights(AccessRights.Rights.READWRITE);
+ }
+ else
+ {
+ _rights = new AccessRights(AccessRights.Rights.READ);
+ }
+ }
+ else if (rights.indexOf('w') != -1)
+ {
+ _rights = new AccessRights(AccessRights.Rights.WRITE);
+ }
+ }
+
+ public AccessRights getAccessRights()
+ {
+ return _rights;
+ }
+
+ public String getVirtualHost()
+ {
+ return _vhost;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
new file mode 100644
index 0000000000..956db64d90
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.auth.database;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser;
+import org.apache.qpid.server.security.access.AMQUserManagementMBean;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.EncoderException;
+
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.AccountNotFoundException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.UnsupportedEncodingException;
+import java.io.PrintStream;
+import java.util.regex.Pattern;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.concurrent.locks.ReentrantLock;
+import java.security.Principal;
+import java.security.NoSuchAlgorithmException;
+import java.security.MessageDigest;
+
+/**
+ * Represents a user database where the account information is stored in a simple flat file.
+ *
+ * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn
+ *
+ * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text.
+ */
+public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
+{
+ private static final Logger _logger = Logger.getLogger(Base64MD5PasswordFilePrincipalDatabase.class);
+
+ private File _passwordFile;
+
+ private Pattern _regexp = Pattern.compile(":");
+
+ private Map<String, AuthenticationProviderInitialiser> _saslServers;
+
+ AMQUserManagementMBean _mbean;
+ private static final String DEFAULT_ENCODING = "utf-8";
+ private Map<String, User> _users = new HashMap<String, User>();
+ private ReentrantLock _userUpdate = new ReentrantLock();
+
+ public Base64MD5PasswordFilePrincipalDatabase()
+ {
+ _saslServers = new HashMap<String, AuthenticationProviderInitialiser>();
+
+ /**
+ * Create Authenticators for MD5 Password file.
+ */
+
+ // Accept Plain incomming and hash it for comparison to the file.
+ CRAMMD5HashedInitialiser cram = new CRAMMD5HashedInitialiser();
+ cram.initialise(this);
+ _saslServers.put(cram.getMechanismName(), cram);
+
+ //fixme The PDs should setup a PD Mangement MBean
+// try
+// {
+// _mbean = new AMQUserManagementMBean();
+// _mbean.setPrincipalDatabase(this);
+// }
+// catch (JMException e)
+// {
+// _logger.warn("User management disabled as unable to create MBean:" + e);
+// }
+ }
+
+ public void setPasswordFile(String passwordFile) throws IOException
+ {
+ File f = new File(passwordFile);
+ _logger.info("PasswordFilePrincipalDatabase using file " + f.getAbsolutePath());
+ _passwordFile = f;
+ if (!f.exists())
+ {
+ throw new FileNotFoundException("Cannot find password file " + f);
+ }
+ if (!f.canRead())
+ {
+ throw new FileNotFoundException("Cannot read password file " + f +
+ ". Check permissions.");
+ }
+
+ loadPasswordFile();
+ }
+
+ /**
+ * SASL Callback Mechanism - sets the Password in the PasswordCallback based on the value in the PasswordFile
+ *
+ * @param principal The Principal to set the password for
+ * @param callback The PasswordCallback to call setPassword on
+ *
+ * @throws AccountNotFoundException If the Principal cannont be found in this Database
+ */
+ public void setPassword(Principal principal, PasswordCallback callback) throws AccountNotFoundException
+ {
+ if (_passwordFile == null)
+ {
+ throw new AccountNotFoundException("Unable to locate principal since no password file was specified during initialisation");
+ }
+ if (principal == null)
+ {
+ throw new IllegalArgumentException("principal must not be null");
+ }
+
+ char[] pwd = lookupPassword(principal.getName());
+
+ if (pwd != null)
+ {
+ callback.setPassword(pwd);
+ }
+ else
+ {
+ throw new AccountNotFoundException("No account found for principal " + principal);
+ }
+ }
+
+ /**
+ * Used to verify that the presented Password is correct. Currently only used by Management Console
+ *
+ * @param principal The principal to authenticate
+ * @param password The password to check
+ *
+ * @return true if password is correct
+ *
+ * @throws AccountNotFoundException if the principal cannot be found
+ */
+ public boolean verifyPassword(String principal, String password) throws AccountNotFoundException
+ {
+ try
+ {
+ char[] pwd = lookupPassword(principal);
+ byte[] passwordBytes = password.getBytes(DEFAULT_ENCODING);
+
+ int index = 0;
+ boolean verified = true;
+
+ while (verified & index < passwordBytes.length)
+ {
+ verified = (pwd[index] == (char) passwordBytes[index]);
+ index++;
+ }
+ return verified;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return false;
+ }
+ }
+
+ public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ {
+ User user = _users.get(principal.getName());
+
+ if (user == null)
+ {
+ throw new AccountNotFoundException(principal.getName());
+ }
+
+ try
+ {
+
+ char[] passwd = convertPassword(password);
+
+ try
+ {
+ _userUpdate.lock();
+ user.setPassword(passwd);
+
+ try
+ {
+ savePasswordFile();
+ }
+ catch (IOException e)
+ {
+ _logger.error("Unable to save password file, password change for user'"
+ + principal + "' will revert at restart");
+ return false;
+ }
+ return true;
+ }
+ finally
+ {
+ if (_userUpdate.isHeldByCurrentThread())
+ {
+ _userUpdate.unlock();
+ }
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return false;
+ }
+ }
+
+ private char[] convertPassword(String password) throws UnsupportedEncodingException
+ {
+ byte[] passwdBytes = password.getBytes(DEFAULT_ENCODING);
+
+ char[] passwd = new char[passwdBytes.length];
+
+ int index = 0;
+
+ for (byte b : passwdBytes)
+ {
+ passwd[index++] = (char) b;
+ }
+
+ return passwd;
+ }
+
+ public boolean createPrincipal(Principal principal, String password)
+ {
+ if (_users.get(principal.getName()) != null)
+ {
+ return false;
+ }
+
+ User user;
+ try
+ {
+ user = new User(principal.getName(), convertPassword(password));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ _logger.warn("Unable to encode password:" + e);
+ return false;
+ }
+
+ try
+ {
+ _userUpdate.lock();
+ _users.put(user.getName(), user);
+
+ try
+ {
+ savePasswordFile();
+ return true;
+ }
+ catch (IOException e)
+ {
+ return false;
+ }
+
+ }
+ finally
+ {
+ if (_userUpdate.isHeldByCurrentThread())
+ {
+ _userUpdate.unlock();
+ }
+ }
+ }
+
+ public boolean deletePrincipal(Principal principal) throws AccountNotFoundException
+ {
+ User user = _users.get(principal.getName());
+
+ if (user == null)
+ {
+ throw new AccountNotFoundException(principal.getName());
+ }
+
+ try
+ {
+ _userUpdate.lock();
+ user.delete();
+
+ try
+ {
+ savePasswordFile();
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Unable to remove user '" + user.getName() + "' from password file.");
+ return false;
+ }
+
+ _users.remove(user.getName());
+ }
+ finally
+ {
+ if (_userUpdate.isHeldByCurrentThread())
+ {
+ _userUpdate.unlock();
+ }
+ }
+
+ return true;
+ }
+
+
+ public Map<String, AuthenticationProviderInitialiser> getMechanisms()
+ {
+ return _saslServers;
+ }
+
+ public List<Principal> getUsers()
+ {
+ return new LinkedList<Principal>(_users.values());
+ }
+
+ public Principal getUser(String username)
+ {
+ if (_users.containsKey(username))
+ {
+ return new UsernamePrincipal(username);
+ }
+ return null;
+ }
+
+ /**
+ * Looks up the password for a specified user in the password file. Note this code is <b>not</b> secure since it
+ * creates strings of passwords. It should be modified to create only char arrays which get nulled out.
+ *
+ * @param name The principal name to lookup
+ *
+ * @return a char[] for use in SASL.
+ */
+ private char[] lookupPassword(String name)
+ {
+ User user = _users.get(name);
+ if (user == null)
+ {
+ return null;
+ }
+ else
+ {
+ return user.getPassword();
+ }
+ }
+
+
+ private void loadPasswordFile() throws IOException
+ {
+ try
+ {
+ _userUpdate.lock();
+ _users.clear();
+
+ BufferedReader reader = null;
+ try
+ {
+ reader = new BufferedReader(new FileReader(_passwordFile));
+ String line;
+
+ while ((line = reader.readLine()) != null)
+ {
+ String[] result = _regexp.split(line);
+ if (result == null || result.length < 2 || result[0].startsWith("#"))
+ {
+ continue;
+ }
+
+ User user = new User(result);
+ _logger.info("Created user:" + user);
+ _users.put(user.getName(), user);
+ }
+ }
+ finally
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ }
+ }
+ finally
+ {
+ if (_userUpdate.isHeldByCurrentThread())
+ {
+ _userUpdate.unlock();
+ }
+ }
+ }
+
+ private void savePasswordFile() throws IOException
+ {
+ try
+ {
+ _userUpdate.lock();
+
+ BufferedReader reader = null;
+ PrintStream writer = null;
+ File tmp = new File(_passwordFile.getAbsolutePath() + ".tmp");
+ if (tmp.exists())
+ {
+ tmp.delete();
+ }
+ try
+ {
+ writer = new PrintStream(tmp);
+ reader = new BufferedReader(new FileReader(_passwordFile));
+ String line;
+
+ while ((line = reader.readLine()) != null)
+ {
+ String[] result = _regexp.split(line);
+ if (result == null || result.length < 2 || result[0].startsWith("#"))
+ {
+ writer.write(line.getBytes(DEFAULT_ENCODING));
+ continue;
+ }
+
+ User user = _users.get(result[0]);
+
+ if (user == null)
+ {
+ writer.write(line.getBytes(DEFAULT_ENCODING));
+ writer.println();
+ }
+ else if (!user.isDeleted())
+ {
+ if (!user.isModified())
+ {
+ writer.write(line.getBytes(DEFAULT_ENCODING));
+ writer.println();
+ }
+ else
+ {
+ try
+ {
+ byte[] encodedPassword = user.getEncodePassword();
+
+ writer.write((user.getName() + ":").getBytes(DEFAULT_ENCODING));
+ writer.write(encodedPassword);
+ writer.println();
+
+ user.saved();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Unable to encode new password reverting to old password.");
+ writer.write(line.getBytes(DEFAULT_ENCODING));
+ writer.println();
+ }
+ }
+ }
+ }
+
+ for (User user : _users.values())
+ {
+ if (user.isModified())
+ {
+ byte[] encodedPassword;
+ try
+ {
+ encodedPassword = user.getEncodePassword();
+ writer.write((user.getName() + ":").getBytes(DEFAULT_ENCODING));
+ writer.write(encodedPassword);
+ writer.println();
+ user.saved();
+ }
+ catch (Exception e)
+ {
+ _logger.warn("Unable to get Encoded password for user'" + user.getName() + "' password not saved");
+ }
+ }
+ }
+ }
+ finally
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+
+ if (writer != null)
+ {
+ writer.close();
+ }
+
+ // Swap temp file to main password file.
+ File old = new File(_passwordFile.getAbsoluteFile() + ".old");
+ if (old.exists())
+ {
+ old.delete();
+ }
+ _passwordFile.renameTo(old);
+ tmp.renameTo(_passwordFile);
+ tmp.delete();
+ }
+ }
+ finally
+ {
+ if (_userUpdate.isHeldByCurrentThread())
+ {
+ _userUpdate.unlock();
+ }
+ }
+ }
+
+ private class User implements Principal
+ {
+ String _name;
+ char[] _password;
+ byte[] _encodedPassword = null;
+ private boolean _modified = false;
+ private boolean _deleted = false;
+
+ User(String[] data) throws UnsupportedEncodingException
+ {
+ if (data.length != 2)
+ {
+ throw new IllegalArgumentException("User Data should be lenght 2, username, password");
+ }
+
+ _name = data[0];
+
+ byte[] encoded_password = data[1].getBytes(DEFAULT_ENCODING);
+
+ Base64 b64 = new Base64();
+ byte[] decoded = b64.decode(encoded_password);
+
+ _encodedPassword = encoded_password;
+
+ _password = new char[decoded.length];
+
+ int index = 0;
+ for (byte c : decoded)
+ {
+ _password[index++] = (char) c;
+ }
+ }
+
+ public User(String name, char[] password)
+ {
+ _name = name;
+ setPassword(password);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public String toString()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ return getName() + ((_encodedPassword == null) ? "" : ":" + new String(_encodedPassword));
+ }
+ else
+ {
+ return _name;
+ }
+ }
+
+ char[] getPassword()
+ {
+ return _password;
+ }
+
+ void setPassword(char[] password)
+ {
+ _password = password;
+ _modified = true;
+ _encodedPassword = null;
+ }
+
+
+ byte[] getEncodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException
+ {
+ if (_encodedPassword == null)
+ {
+ encodePassword();
+ }
+ return _encodedPassword;
+ }
+
+ private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException
+ {
+ Base64 b64 = new Base64();
+ _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING));
+ }
+
+ public boolean isModified()
+ {
+ return _modified;
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted;
+ }
+
+ public void delete()
+ {
+ _deleted = true;
+ }
+
+ public void saved()
+ {
+ _modified = false;
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
index 0c35206dd3..2d3f5e5131 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
@@ -1,38 +1,46 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.security.auth.database;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.configuration.PropertyUtils;
+import org.apache.qpid.configuration.PropertyException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.configuration.PropertyUtils;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.access.AMQUserManagementMBean;
+import org.apache.qpid.AMQException;
-import java.util.Map;
-import java.util.List;
-import java.util.HashMap;
-import java.lang.reflect.Method;
-import java.io.FileNotFoundException;
+import javax.management.JMException;
public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatabaseManager
{
@@ -80,18 +88,21 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
initialisePrincipalDatabase((PrincipalDatabase) o, config, i);
String name = databaseNames.get(i);
- if (name == null || name.length() == 0)
+ if ((name == null) || (name.length() == 0))
{
throw new Exception("Principal database names must have length greater than or equal to one character");
}
+
PrincipalDatabase pd = databases.get(name);
if (pd != null)
{
throw new Exception("Duplicate principal database name not provided");
}
+
_logger.info("Initialised principal database '" + name + "' successfully");
databases.put(name, (PrincipalDatabase) o);
}
+
return databases;
}
@@ -104,14 +115,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
for (int i = 0; i < argumentNames.size(); i++)
{
String argName = argumentNames.get(i);
- if (argName == null || argName.length() == 0)
+ if ((argName == null) || (argName.length() == 0))
{
throw new ConfigurationException("Argument names must have length >= 1 character");
}
+
if (Character.isLowerCase(argName.charAt(0)))
{
argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1);
}
+
String methodName = "set" + argName;
Method method = null;
try
@@ -125,9 +138,10 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
if (method == null)
{
- throw new ConfigurationException("No method " + methodName + " found in class " + principalDatabase.getClass() +
- " hence unable to configure principal database. The method must be public and " +
- "have a single String argument with a void return type");
+ throw new ConfigurationException("No method " + methodName + " found in class "
+ + principalDatabase.getClass()
+ + " hence unable to configure principal database. The method must be public and "
+ + "have a single String argument with a void return type");
}
try
@@ -136,7 +150,14 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
}
catch (Exception ite)
{
- throw new ConfigurationException(ite.getCause());
+ if (ite instanceof ConfigurationException)
+ {
+ throw(ConfigurationException) ite;
+ }
+ else
+ {
+ throw new ConfigurationException(ite.getMessage(), ite);
+ }
}
}
}
@@ -145,4 +166,71 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab
{
return _databases;
}
+
+ public void initialiseManagement(Configuration config) throws ConfigurationException
+ {
+ try
+ {
+ AMQUserManagementMBean _mbean = new AMQUserManagementMBean();
+
+ String baseSecurity = "security.jmx";
+ List<String> principalDBs = config.getList(baseSecurity + ".principal-database");
+
+ if (principalDBs.size() == 0)
+ {
+ throw new ConfigurationException("No principal-database specified for jmx security(" + baseSecurity + ".principal-database)");
+ }
+
+ String databaseName = principalDBs.get(0);
+
+ PrincipalDatabase database = getDatabases().get(databaseName);
+
+ if (database == null)
+ {
+ throw new ConfigurationException("Principal-database '" + databaseName + "' not found");
+ }
+
+ _mbean.setPrincipalDatabase(database);
+
+ List<String> jmxaccesslist = config.getList(baseSecurity + ".access");
+
+ if (jmxaccesslist.size() == 0)
+ {
+ throw new ConfigurationException("No access control files specified for jmx security(" + baseSecurity + ".access)");
+ }
+
+ String jmxaccesssFile = null;
+
+ try
+ {
+ jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0));
+ }
+ catch (PropertyException e)
+ {
+ throw new ConfigurationException("Unable to parse access control filename '" + jmxaccesssFile + "'");
+ }
+
+ try
+ {
+ _mbean.setAccessFile(jmxaccesssFile);
+ }
+ catch (IOException e)
+ {
+ _logger.warn("Unable to load access file:" + jmxaccesssFile);
+ }
+
+ try
+ {
+ _mbean.register();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to register user management MBean");
+ }
+ }
+ catch (JMException e)
+ {
+ _logger.warn("User management disabled as unable to create MBean:" + e);
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java
deleted file mode 100644
index c24a5f21e9..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/MD5PasswordFilePrincipalDatabase.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.security.auth.database;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
-import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser;
-import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
-
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.login.AccountNotFoundException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.util.regex.Pattern;
-import java.util.Map;
-import java.util.HashMap;
-import java.security.Principal;
-
-/**
- * Represents a user database where the account information is stored in a simple flat file.
- *
- * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn
- *
- * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text.
- */
-public class MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
-{
- private static final Logger _logger = Logger.getLogger(MD5PasswordFilePrincipalDatabase.class);
-
- private File _passwordFile;
-
- private Pattern _regexp = Pattern.compile(":");
-
- private Map<String, AuthenticationProviderInitialiser> _saslServers;
-
- public MD5PasswordFilePrincipalDatabase()
- {
- _saslServers = new HashMap<String, AuthenticationProviderInitialiser>();
-
- /**
- * Create Authenticators for MD5 Password file.
- */
-
- // Accept MD5 incomming and use plain comparison with the file
- PlainInitialiser cram = new PlainInitialiser();
- cram.initialise(this);
- // Accept Plain incomming and hash it for comparison to the file.
- CRAMMD5Initialiser plain = new CRAMMD5Initialiser();
- plain.initialise(this,CRAMMD5Initialiser.HashDirection.INCOMMING);
-
- _saslServers.put(plain.getMechanismName(), cram);
- _saslServers.put(cram.getMechanismName(), plain);
- }
-
- public void setPasswordFile(String passwordFile) throws FileNotFoundException
- {
- File f = new File(passwordFile);
- _logger.info("PasswordFilePrincipalDatabase using file " + f.getAbsolutePath());
- _passwordFile = f;
- if (!f.exists())
- {
- throw new FileNotFoundException("Cannot find password file " + f);
- }
- if (!f.canRead())
- {
- throw new FileNotFoundException("Cannot read password file " + f +
- ". Check permissions.");
- }
- }
-
- public void setPassword(Principal principal, PasswordCallback callback) throws IOException,
- AccountNotFoundException
- {
- if (_passwordFile == null)
- {
- throw new AccountNotFoundException("Unable to locate principal since no password file was specified during initialisation");
- }
- if (principal == null)
- {
- throw new IllegalArgumentException("principal must not be null");
- }
- char[] pwd = lookupPassword(principal.getName());
- if (pwd != null)
- {
- callback.setPassword(pwd);
- }
- else
- {
- throw new AccountNotFoundException("No account found for principal " + principal);
- }
- }
-
- public Map<String, AuthenticationProviderInitialiser> getMechanisms()
- {
- return _saslServers;
- }
-
- /**
- * Looks up the password for a specified user in the password file. Note this code is <b>not</b> secure since it
- * creates strings of passwords. It should be modified to create only char arrays which get nulled out.
- *
- * @param name
- *
- * @return
- *
- * @throws java.io.IOException
- */
- private char[] lookupPassword(String name) throws IOException
- {
- BufferedReader reader = null;
- try
- {
- reader = new BufferedReader(new FileReader(_passwordFile));
- String line;
-
- while ((line = reader.readLine()) != null)
- {
- String[] result = _regexp.split(line);
- if (result == null || result.length < 2)
- {
- continue;
- }
-
- if (name.equals(result[0]))
- {
- return result[1].toCharArray();
- }
- }
- return null;
- }
- finally
- {
- if (reader != null)
- {
- reader.close();
- }
- }
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
index 3abdd9a7ff..3f6794aaaf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.security.auth.database;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainInitialiser;
import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser;
import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
@@ -34,9 +34,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
+import java.io.UnsupportedEncodingException;
import java.util.regex.Pattern;
import java.util.Map;
import java.util.HashMap;
+import java.util.List;
import java.security.Principal;
/**
@@ -119,21 +121,103 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
+ public boolean verifyPassword(String principal, String password) throws AccountNotFoundException
+ {
+ try
+ {
+ char[] pwd = lookupPassword(principal);
+
+ return compareCharArray(pwd, convertPassword(password));
+ }
+ catch (IOException e)
+ {
+ return false;
+ }
+ }
+
+ private char[] convertPassword(String password) throws UnsupportedEncodingException
+ {
+ byte[] passwdBytes = password.getBytes("utf-8");
+
+ char[] passwd = new char[passwdBytes.length];
+
+ int index = 0;
+
+ for (byte b : passwdBytes)
+ {
+ passwd[index++] = (char) b;
+ }
+
+ return passwd;
+ }
+
+ public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ {
+ return false; // updates denied
+ }
+
+ public boolean createPrincipal(Principal principal, String password)
+ {
+ return false; // updates denied
+ }
+
+ public boolean deletePrincipal(Principal principal) throws AccountNotFoundException
+ {
+ return false; // updates denied
+ }
+
public Map<String, AuthenticationProviderInitialiser> getMechanisms()
{
return _saslServers;
}
+ public List<Principal> getUsers()
+ {
+ return null; //todo
+ }
+
+ public Principal getUser(String username)
+ {
+ try
+ {
+ if (lookupPassword(username) != null)
+ {
+ return new UsernamePrincipal(username);
+ }
+ }
+ catch (IOException e)
+ {
+ //fall through to null return
+ }
+ return null;
+ }
+
+ private boolean compareCharArray(char[] a, char[] b)
+ {
+ boolean equal = false;
+ if (a.length == b.length)
+ {
+ equal = true;
+ int index = 0;
+ while (equal && index < a.length)
+ {
+ equal = a[index] == b[index];
+ index++;
+ }
+ }
+ return equal;
+ }
+
/**
* Looks up the password for a specified user in the password file. Note this code is <b>not</b> secure since it
* creates strings of passwords. It should be modified to create only char arrays which get nulled out.
*
- * @param name
+ * @param name the name of the principal to lookup
*
- * @return
+ * @return char[] of the password
*
- * @throws java.io.IOException
+ * @throws java.io.IOException whilst accessing the file
*/
private char[] lookupPassword(String name) throws IOException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java
index c8318d6e64..598f8f8b4c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java
@@ -20,26 +20,17 @@
*/
package org.apache.qpid.server.security.auth.database;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
-import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser;
-import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
import org.apache.qpid.server.security.access.AccessManager;
import org.apache.qpid.server.security.access.AccessResult;
import org.apache.qpid.server.security.access.Accessable;
+import org.apache.qpid.server.security.access.AccessRights;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.log4j.Logger;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.login.AccountNotFoundException;
-import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
-import java.util.regex.Pattern;
-import java.util.Map;
-import java.util.HashMap;
import java.security.Principal;
/**
@@ -103,9 +94,15 @@ public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePr
public AccessResult isAuthorized(Accessable accessObject, String username)
{
+ return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ);
+ }
+
+ public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights)
+ {
+
if (accessObject instanceof VirtualHost)
{
- String[] hosts = lookupVirtualHost(username);
+ String[] hosts = lookupVirtualHost(user.getName());
if (hosts != null)
{
@@ -126,4 +123,5 @@ public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePr
{
return "PlainPasswordVhostFile";
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
index 6c5a2a44ee..8073fcc3c6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
@@ -23,8 +23,10 @@ package org.apache.qpid.server.security.auth.database;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.security.Principal;
import java.util.Map;
+import java.util.List;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.login.AccountNotFoundException;
@@ -46,5 +48,53 @@ public interface PrincipalDatabase
void setPassword(Principal principal, PasswordCallback callback)
throws IOException, AccountNotFoundException;
+ /**
+ * Used to verify that the presented Password is correct. Currently only used by Management Console
+ * @param principal The principal to authenticate
+ * @param password The password to check
+ * @return true if password is correct
+ * @throws AccountNotFoundException if the principal cannot be found
+ */
+ boolean verifyPassword(String principal, String password)
+ throws AccountNotFoundException;
+
+ /**
+ * Update(Change) the password for the given principal
+ * @param principal Who's password is to be changed
+ * @param password The new password to use
+ * @return True if change was successful
+ * @throws AccountNotFoundException If the given principal doesn't exist in the Database
+ */
+ boolean updatePassword(Principal principal, String password)
+ throws AccountNotFoundException;
+
+ /**
+ * Create a new principal in the database
+ * @param principal The principal to create
+ * @param password The password to set for the principal
+ * @return True on a successful creation
+ */
+ boolean createPrincipal(Principal principal, String password);
+
+ /**
+ * Delete a principal
+ * @param principal The principal to delete
+ * @return True on a successful creation
+ * @throws AccountNotFoundException If the given principal doesn't exist in the Database
+ */
+ boolean deletePrincipal(Principal principal)
+ throws AccountNotFoundException;
+
+ /**
+ * Get the principal from the database with the given username
+ * @param username of the principal to lookup
+ * @return The Principal object for the given username or null if not found.
+ */
+ Principal getUser(String username);
+
+
public Map<String, AuthenticationProviderInitialiser> getMechanisms();
+
+
+ List<Principal> getUsers();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java
index 83f1201bd8..2c553ae76a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java
@@ -21,10 +21,14 @@
package org.apache.qpid.server.security.auth.database;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import java.util.Map;
public interface PrincipalDatabaseManager
{
public Map<String, PrincipalDatabase> getDatabases();
+
+ public void initialiseManagement(Configuration config) throws ConfigurationException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
index 9a58acd98c..b1ac0e1f00 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.security.auth.database;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser;
import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
@@ -29,8 +30,10 @@ import javax.security.auth.login.AccountNotFoundException;
import java.util.Properties;
import java.util.Map;
import java.util.HashMap;
+import java.util.List;
import java.security.Principal;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
public class PropertiesPrincipalDatabase implements PrincipalDatabase
{
@@ -76,8 +79,87 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase
}
}
+ public boolean verifyPassword(String principal, String password) throws AccountNotFoundException
+ {
+ char[] pwd = _users.getProperty(principal).toCharArray();
+
+ try
+ {
+ return compareCharArray(pwd, convertPassword(password));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return false;
+ }
+ }
+
+ public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ {
+ return false; // updates denied
+ }
+
+ public boolean createPrincipal(Principal principal, String password)
+ {
+ return false; // updates denied
+ }
+
+ public boolean deletePrincipal(Principal principal) throws AccountNotFoundException
+ {
+ return false; // updates denied
+ }
+
+ private boolean compareCharArray(char[] a, char[] b)
+ {
+ boolean equal = false;
+ if (a.length == b.length)
+ {
+ equal = true;
+ int index = 0;
+ while (equal && index < a.length)
+ {
+ equal = a[index] == b[index];
+ index++;
+ }
+ }
+ return equal;
+ }
+
+ private char[] convertPassword(String password) throws UnsupportedEncodingException
+ {
+ byte[] passwdBytes = password.getBytes("utf-8");
+
+ char[] passwd = new char[passwdBytes.length];
+
+ int index = 0;
+
+ for (byte b : passwdBytes)
+ {
+ passwd[index++] = (char) b;
+ }
+
+ return passwd;
+ }
+
+
public Map<String, AuthenticationProviderInitialiser> getMechanisms()
{
return _saslServers;
}
+
+ public List<Principal> getUsers()
+ {
+ return null; //todo
+ }
+
+ public Principal getUser(String username)
+ {
+ if (_users.getProperty(username) != null)
+ {
+ return new UsernamePrincipal(username);
+ }
+ else
+ {
+ return null;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
index 89c84e8130..6b86a46bd2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.security.auth.database;
+import org.apache.commons.configuration.Configuration;
+
import java.util.Map;
import java.util.Properties;
import java.util.HashMap;
@@ -38,4 +40,9 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana
{
return _databases;
}
+
+ public void initialiseManagement(Configuration config)
+ {
+ //todo
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index 0546bbb81e..ce5e0cd748 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -71,7 +71,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
Map<String, Class<? extends SaslServerFactory>> providerMap = new TreeMap<String, Class<? extends SaslServerFactory>>();
- if (name == null)
+ if (name == null || hostConfig == null)
{
initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
}
@@ -108,11 +108,15 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
if (providerMap.size() > 0)
{
- Security.addProvider(new JCAProvider(providerMap));
+ // Ensure we are used before the defaults
+ if (Security.insertProviderAt(new JCAProvider(providerMap), 1) == -1)
+ {
+ _logger.warn("Unable to set order of providers.");
+ }
}
else
{
- _logger.warn("No SASL providers availble.");
+ _logger.warn("No additional SASL providers registered.");
}
}
@@ -148,21 +152,20 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
{
if (database == null || database.getMechanisms().size() == 0)
{
- _logger.warn("");
+ _logger.warn("No Database or no mechanisms to initialise authentication");
return;
}
- for (AuthenticationProviderInitialiser mechanism : database.getMechanisms().values())
+ for (Map.Entry<String, AuthenticationProviderInitialiser> mechanism : database.getMechanisms().entrySet())
{
- initialiseAuthenticationMechanism(mechanism, providerMap);
+ initialiseAuthenticationMechanism(mechanism.getKey(), mechanism.getValue(), providerMap);
}
}
- private void initialiseAuthenticationMechanism(AuthenticationProviderInitialiser initialiser,
+ private void initialiseAuthenticationMechanism(String mechanism, AuthenticationProviderInitialiser initialiser,
Map<String, Class<? extends SaslServerFactory>> providerMap)
throws Exception
{
- String mechanism = initialiser.getMechanismName();
if (_mechanisms == null)
{
_mechanisms = mechanism;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
index 8ffcdc4e36..fd4ad86055 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
@@ -33,7 +33,7 @@ public final class JCAProvider extends Provider
super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
"AMQ SASL providers that want to be registered");
register(providerMap);
- Security.addProvider(this);
+ //Security.addProvider(this);
}
private void register(Map<String, Class<? extends SaslServerFactory>> providerMap)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
index 68095de3a0..dd0bd096c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,14 +33,16 @@ import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.AuthorizeCallback;
import org.apache.commons.configuration.Configuration;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
public abstract class UsernamePasswordInitialiser implements AuthenticationProviderInitialiser
{
- protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class);
+ protected static final Logger _logger = Logger.getLogger(UsernamePasswordInitialiser.class);
private ServerCallbackHandler _callbackHandler;
@@ -72,7 +74,9 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi
{
// very annoyingly the callback handler does not throw anything more appropriate than
// IOException
- throw new IOException("Error looking up user " + e);
+ IOException ioe = new IOException("Error looking up user " + e);
+ ioe.initCause(e);
+ throw ioe;
}
}
else if (callback instanceof AuthorizeCallback)
@@ -88,7 +92,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi
}
public void initialise(String baseConfigPath, Configuration configuration,
- Map<String, PrincipalDatabase> principalDatabases) throws Exception
+ Map<String, PrincipalDatabase> principalDatabases) throws Exception
{
String principalDatabaseName = configuration.getString(baseConfigPath + ".principal-database");
PrincipalDatabase db = principalDatabases.get(principalDatabaseName);
@@ -102,6 +106,7 @@ public abstract class UsernamePasswordInitialiser implements AuthenticationProvi
{
throw new NullPointerException("Cannot initialise with a null Principal database.");
}
+
_callbackHandler = new ServerCallbackHandler(db);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java
index f9aaabd15a..d7c8383690 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePrincipal.java
@@ -22,10 +22,7 @@ package org.apache.qpid.server.security.auth.sasl;
import java.security.Principal;
-/**
- * A principal that is just a wrapper for a simple username.
- *
- */
+/** A principal that is just a wrapper for a simple username. */
public class UsernamePrincipal implements Principal
{
private String _name;
@@ -39,4 +36,9 @@ public class UsernamePrincipal implements Principal
{
return _name;
}
+
+ public String toString()
+ {
+ return _name;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java
new file mode 100644
index 0000000000..97f9a4e91a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedInitialiser.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth.sasl.crammd5;
+
+import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+
+import javax.security.sasl.SaslServerFactory;
+import java.util.Map;
+
+public class CRAMMD5HashedInitialiser extends UsernamePasswordInitialiser
+{
+ public String getMechanismName()
+ {
+ return CRAMMD5HashedSaslServer.MECHANISM;
+ }
+
+ public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
+ {
+ return CRAMMD5HashedServerFactory.class;
+ }
+
+ public void initialise(PrincipalDatabase passwordFile)
+ {
+ super.initialise(passwordFile);
+ }
+
+ public Map<String, ?> getProperties()
+ {
+ return null;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java
new file mode 100644
index 0000000000..f6cab084ea
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedSaslServer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.auth.sasl.crammd5;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslServerFactory;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Enumeration;
+import java.util.Map;
+
+public class CRAMMD5HashedSaslServer implements SaslServer
+{
+ public static final String MECHANISM = "CRAM-MD5-HASHED";
+
+ private SaslServer _realServer;
+
+ public CRAMMD5HashedSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
+ CallbackHandler cbh) throws SaslException
+ {
+ Enumeration factories = Sasl.getSaslServerFactories();
+
+ while (factories.hasMoreElements())
+ {
+ SaslServerFactory factory = (SaslServerFactory) factories.nextElement();
+
+ if (factory instanceof CRAMMD5HashedServerFactory)
+ {
+ continue;
+ }
+
+ String[] mechs = factory.getMechanismNames(props);
+
+ for (String mech : mechs)
+ {
+ if (mech.equals("CRAM-MD5"))
+ {
+ _realServer = factory.createSaslServer("CRAM-MD5", protocol, serverName, props, cbh);
+ return;
+ }
+ }
+ }
+
+ throw new RuntimeException("No default SaslServer found for mechanism:" + "CRAM-MD5");
+ }
+
+ public String getMechanismName()
+ {
+ return MECHANISM;
+ }
+
+ public byte[] evaluateResponse(byte[] response) throws SaslException
+ {
+ return _realServer.evaluateResponse(response);
+ }
+
+ public boolean isComplete()
+ {
+ return _realServer.isComplete();
+ }
+
+ public String getAuthorizationID()
+ {
+ return _realServer.getAuthorizationID();
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ return _realServer.unwrap(incoming, offset, len);
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ return _realServer.wrap(outgoing, offset, len);
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return _realServer.getNegotiatedProperty(propName);
+ }
+
+ public void dispose() throws SaslException
+ {
+ _realServer.dispose();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java
new file mode 100644
index 0000000000..5298b5cc63
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HashedServerFactory.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth.sasl.crammd5;
+
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+public class CRAMMD5HashedServerFactory implements SaslServerFactory
+{
+ public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
+ CallbackHandler cbh) throws SaslException
+ {
+ if (mechanism.equals(CRAMMD5HashedSaslServer.MECHANISM))
+ {
+ return new CRAMMD5HashedSaslServer(mechanism, protocol, serverName, props, cbh);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public String[] getMechanismNames(Map props)
+ {
+ if (props != null)
+ {
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
+ {
+ // returned array must be non null according to interface documentation
+ return new String[0];
+ }
+ }
+
+ return new String[]{CRAMMD5HashedSaslServer.MECHANISM};
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
index ff3e87e3a0..f0dd9eeb6d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
@@ -29,7 +29,7 @@ import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
public class PlainSaslServerFactory implements SaslServerFactory
-{
+{
public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map props,
CallbackHandler cbh) throws SaslException
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
index 05d1cd5291..609a85c22f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
@@ -56,18 +56,7 @@ public class CleanupMessageOperation implements TxnOp
public void commit(StoreContext context)
{
- //The routers reference can now be released. This is done
- //here to ensure that it happens after the queues that
- //enqueue it have incremented their counts (which as a
- //memory only operation is done in the commit phase).
- try
- {
- _msg.decrementReference(context);
- }
- catch (AMQException e)
- {
- _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
- }
+
try
{
_msg.checkDeliveredToConsumer();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index cf0da55f2a..6d776eec0f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -89,6 +89,12 @@ public class LocalTransactionalContext implements TransactionalContext
public void rollback() throws AMQException
{
_txnBuffer.rollback(_storeContext);
+ // Hack to deal with uncommitted non-transactional writes
+ if(_messageStore.inTran(_storeContext))
+ {
+ _messageStore.abortTran(_storeContext);
+ _inTran = false;
+ }
_postCommitDeliveryList.clear();
}
@@ -103,6 +109,7 @@ public class LocalTransactionalContext implements TransactionalContext
// message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
+ _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
{
@@ -111,7 +118,7 @@ public class LocalTransactionalContext implements TransactionalContext
}
message.incrementReference();
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+
*/
}
@@ -195,6 +202,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.enlist(new StoreMessageOperation(_messageStore));
}
+ //fixme fail commit here ... QPID-440
try
{
_txnBuffer.commit(_storeContext);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index 339ca8ae1a..405c233552 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -41,7 +41,7 @@ public class TxnBuffer
{
if (_log.isDebugEnabled())
{
- _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray());
+ _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops);
}
if (prepare(context))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index c24d1aa23a..b5c59dbbb7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -181,7 +181,7 @@ public class VirtualHost implements Accessable
catch (Exception e)
{
_logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
- throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor");
+ throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e);
}
Configurator.configure(instance);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 236291968f..0c1da5c278 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -106,6 +106,8 @@ public class AMQQueueAlertTest extends TestCase
/**
* Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
*
+ * Based on FT402 subbmitted by client
+ *
* @throws Exception
*/
public void testQueueDepthAlertNoSubscriber() throws Exception