summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-19 21:44:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-19 21:44:19 +0000
commit840b1793643b37b9fa8f8352a6851417042301ed (patch)
treeec59ddd96d2758c5621e8b6046478dae760c6eb0 /qpid/java
parent29a70653f314b99c103c8149cacc4fed2a13898c (diff)
downloadqpid-python-840b1793643b37b9fa8f8352a6851417042301ed.tar.gz
QPID-5567 : Always Use AccessControllerContext to find the current context Subject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java74
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/SessionPrincipal.java74
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java22
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java6
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java4
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java20
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java27
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/TestPrincipalUtils.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java3
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java24
-rw-r--r--qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java320
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java29
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java17
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java111
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java45
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java66
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java19
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java109
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java32
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java47
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java3
54 files changed, 869 insertions, 408 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index 3104df6b00..61738c0655 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
@@ -211,14 +212,12 @@ public class TaskExecutor
private class CallableWrapper<T> implements Task<T>
{
private Task<T> _userTask;
- private Subject _securityManagerSubject;
private LogActor _actor;
private Subject _contextSubject;
public CallableWrapper(Task<T> userWork)
{
_userTask = userWork;
- _securityManagerSubject = SecurityManager.getThreadSubject();
_actor = CurrentActor.get();
_contextSubject = Subject.getSubject(AccessController.getContext());
}
@@ -226,35 +225,21 @@ public class TaskExecutor
@Override
public T call()
{
- SecurityManager.setThreadSubject(_securityManagerSubject);
CurrentActor.set(_actor);
try
{
T result = null;
- try
- {
- result = Subject.doAs(_contextSubject, new PrivilegedExceptionAction<T>()
+ result = Subject.doAs(_contextSubject, new PrivilegedAction<T>()
{
@Override
- public T run() throws Exception
+ public T run()
{
return executeTask(_userTask);
}
});
- }
- catch (PrivilegedActionException e)
- {
- Exception underlying = e.getException();
- if(underlying instanceof RuntimeException)
- {
- throw (RuntimeException)underlying;
- }
- else
- {
- throw new ServerScopedRuntimeException(e);
- }
- }
+
+
return result;
}
finally
@@ -267,14 +252,6 @@ public class TaskExecutor
{
LOGGER.warn("Unexpected exception on current actor removal", e);
}
- try
- {
- SecurityManager.setThreadSubject(null);
- }
- catch (Exception e)
- {
- LOGGER.warn("Unexpected exception on nullifying of subject for a security manager", e);
- }
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
new file mode 100644
index 0000000000..e459b5df80
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.connection;
+
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+
+import java.security.Principal;
+
+public class ConnectionPrincipal implements Principal
+{
+ private final AMQConnectionModel _connection;
+
+ public ConnectionPrincipal(final AMQConnectionModel connection)
+ {
+ _connection = connection;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _connection.getRemoteAddressString();
+ }
+
+ public AMQConnectionModel getConnection()
+ {
+ return _connection;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final ConnectionPrincipal that = (ConnectionPrincipal) o;
+
+ if (!_connection.equals(that._connection))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _connection.hashCode();
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/SessionPrincipal.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/SessionPrincipal.java
new file mode 100644
index 0000000000..d6dff4f937
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/SessionPrincipal.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.connection;
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
+
+import java.security.Principal;
+
+public class SessionPrincipal implements Principal
+{
+ private final AMQSessionModel _session;
+
+ public SessionPrincipal(final AMQSessionModel session)
+ {
+ _session = session;
+ }
+
+ public AMQSessionModel getSession()
+ {
+ return _session;
+ }
+
+ @Override
+ public String getName()
+ {
+ return "session:"+_session.getId();
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final SessionPrincipal that = (SessionPrincipal) o;
+
+ if (!_session.equals(that._session))
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _session.hashCode();
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index e76b9f15fc..d3a42bddc4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -414,7 +414,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
try
{
- AMQQueue queue = _virtualHost.createQueue(null, attributes);
+ AMQQueue queue = _virtualHost.createQueue(attributes);
synchronized (_queueAdapters)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 623cf62472..d978705841 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.util.Deletable;
+import java.net.SocketAddress;
import java.security.Principal;
import java.util.List;
import java.util.UUID;
@@ -72,6 +73,8 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
String getRemoteAddressString();
+ SocketAddress getRemoteAddress();
+
String getClientId();
String getClientVersion();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 399586fcff..11d2bccb1e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -27,7 +27,6 @@ import java.util.UUID;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -65,18 +64,17 @@ public class AMQQueueFactory implements QueueFactory
@Override
public AMQQueue restoreQueue(Map<String, Object> attributes)
{
- return createOrRestoreQueue(null, attributes, false);
+ return createOrRestoreQueue(attributes, false);
}
@Override
- public AMQQueue createQueue(final AMQSessionModel creatingSession,
- Map<String, Object> attributes)
+ public AMQQueue createQueue(Map<String, Object> attributes)
{
- return createOrRestoreQueue(creatingSession, attributes, true);
+ return createOrRestoreQueue(attributes, true);
}
- private AMQQueue createOrRestoreQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes,
+ private AMQQueue createOrRestoreQueue(Map<String, Object> attributes,
boolean createInStore)
{
@@ -129,19 +127,19 @@ public class AMQQueueFactory implements QueueFactory
if(attributes.containsKey(Queue.SORT_KEY))
{
- queue = new SortedQueue(_virtualHost, creatingSession, attributes);
+ queue = new SortedQueue(_virtualHost, attributes);
}
else if(attributes.containsKey(Queue.LVQ_KEY))
{
- queue = new ConflationQueue(_virtualHost, creatingSession, attributes);
+ queue = new ConflationQueue(_virtualHost, attributes);
}
else if(attributes.containsKey(Queue.PRIORITIES))
{
- queue = new PriorityQueue(_virtualHost, creatingSession, attributes);
+ queue = new PriorityQueue(_virtualHost, attributes);
}
else
{
- queue = new StandardQueue(_virtualHost, creatingSession, attributes);
+ queue = new StandardQueue(_virtualHost, attributes);
}
//Register the new queue
@@ -232,7 +230,7 @@ public class AMQQueueFactory implements QueueFactory
args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()));
args.put(Queue.NAME, dlQueueName);
args.put(Queue.DURABLE, true);
- dlQueue = _virtualHost.createQueue(null, args);
+ dlQueue = _virtualHost.createQueue(args);
}
catch (QueueExistsException e)
{
@@ -260,7 +258,7 @@ public class AMQQueueFactory implements QueueFactory
Map<String, Object> arguments = createQueueAttributesFromConfig(_virtualHost, config);
- AMQQueue q = createOrRestoreQueue(null, arguments, false);
+ AMQQueue q = createOrRestoreQueue(arguments, false);
return q;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
index f350368634..d9c130953a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
@@ -34,9 +34,9 @@ public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.Conflati
protected ConflationQueue(VirtualHost virtualHost,
- final AMQSessionModel creatingSession, Map<String, Object> attributes)
+ Map<String, Object> attributes)
{
- super(virtualHost, creatingSession, attributes, entryList(attributes));
+ super(virtualHost, attributes, entryList(attributes));
}
private static ConflationQueueList.Factory entryList(final Map<String, Object> attributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index c9ac3f3621..20e7edda92 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -29,11 +29,10 @@ public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends
{
protected OutOfOrderQueue(VirtualHost virtualHost,
- final AMQSessionModel creatingSession,
Map<String, Object> attributes,
QueueEntryListFactory<E, Q, L> entryListFactory)
{
- super(virtualHost, creatingSession, attributes, entryListFactory);
+ super(virtualHost, attributes, entryListFactory);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
index 1e41a0fb3e..b86d8efe87 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
@@ -33,10 +33,9 @@ public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQue
public static final int DEFAULT_PRIORITY_LEVELS = 10;
protected PriorityQueue(VirtualHost virtualHost,
- final AMQSessionModel creatingSession,
Map<String, Object> attributes)
{
- super(virtualHost, creatingSession, attributes, entryList(attributes));
+ super(virtualHost, attributes, entryList(attributes));
}
private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
index c80018799b..5b13c4c574 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -26,8 +26,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
public interface QueueFactory
{
- AMQQueue createQueue(final AMQSessionModel creatingSession,
- Map<String, Object> arguments);
+ AMQQueue createQueue(Map<String, Object> arguments);
AMQQueue restoreQueue(Map<String, Object> arguments);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ef3eea19b3..6cbbbf0cb2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.server.queue;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import java.security.Principal;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -29,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -63,6 +66,8 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.security.auth.Subject;
+
abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
MessageGroupManager.ConsumerResetHelper<E,Q,L>
@@ -184,7 +189,6 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
protected SimpleAMQQueue(VirtualHost virtualHost,
- final AMQSessionModel<?,?> creatingSession,
Map<String, Object> attributes,
QueueEntryListFactory<E, Q, L> entryListFactory)
{
@@ -201,25 +205,39 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
Queue.LIFETIME_POLICY,
attributes,
LifetimePolicy.PERMANENT);
- if(creatingSession != null)
+
+ Subject activeSubject = Subject.getSubject(AccessController.getContext());
+ Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
+ AMQSessionModel<?,?> sessionModel;
+ if(sessionPrincipals.isEmpty())
+ {
+ sessionModel = null;
+ }
+ else
+ {
+ final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
+ sessionModel = sessionPrincipal.getSession();
+ }
+
+ if(sessionModel != null)
{
switch(_exclusivityPolicy)
{
case PRINCIPAL:
- _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal();
+ _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal();
break;
case CONTAINER:
- _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName();
+ _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName();
break;
case CONNECTION:
- _exclusiveOwner = creatingSession.getConnectionModel();
- addExclusivityConstraint(creatingSession.getConnectionModel());
+ _exclusiveOwner = sessionModel.getConnectionModel();
+ addExclusivityConstraint(sessionModel.getConnectionModel());
break;
case SESSION:
- _exclusiveOwner = creatingSession;
- addExclusivityConstraint(creatingSession);
+ _exclusiveOwner = sessionModel;
+ addExclusivityConstraint(sessionModel);
break;
case NONE:
case LINK:
@@ -251,9 +269,9 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
{
- if(creatingSession != null)
+ if(sessionModel != null)
{
- addLifetimeConstraint(creatingSession.getConnectionModel());
+ addLifetimeConstraint(sessionModel.getConnectionModel());
}
else
{
@@ -264,9 +282,9 @@ abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleA
}
else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END)
{
- if(creatingSession != null)
+ if(sessionModel != null)
{
- addLifetimeConstraint(creatingSession);
+ addLifetimeConstraint(sessionModel);
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index d1da7feddc..c35c41b4f7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -38,20 +38,19 @@ public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue,
private final String _sortedPropertyName;
protected SortedQueue(VirtualHost virtualHost,
- final AMQSessionModel creatingSession,
Map<String, Object> attributes,
QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory)
{
- super(virtualHost, creatingSession, attributes, factory);
+ super(virtualHost, attributes, factory);
_sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes);
}
protected SortedQueue(VirtualHost virtualHost,
- final AMQSessionModel creatingSession, Map<String, Object> attributes)
+ Map<String, Object> attributes)
{
this(virtualHost,
- creatingSession, attributes,
+ attributes,
new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes)));
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
index 8a0570eb4e..e5050eca86 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
@@ -28,8 +28,8 @@ import java.util.Map;
public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
{
public StandardQueue(final VirtualHost virtualHost,
- final AMQSessionModel creatingSession, final Map<String, Object> arguments)
+ final Map<String, Object> arguments)
{
- super(virtualHost, creatingSession, arguments, new StandardQueueEntryList.Factory());
+ super(virtualHost, arguments, new StandardQueueEntryList.Factory());
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
index 61e928a38c..87946590ec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
@@ -23,20 +23,20 @@ import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.server.security.access.Operation;
/**
- * The two methods, {@link #access(ObjectType, Object)} and {@link #authorise(Operation, ObjectType, ObjectProperties)},
+ * The two methods, {@link #access(org.apache.qpid.server.security.access.ObjectType)} and {@link #authorise(Operation, ObjectType, ObjectProperties)},
* return the {@link Result} of the security decision, which may be to {@link Result#ABSTAIN} if no decision is made.
*/
public interface AccessControl
{
/**
- * Default result for {@link #access(ObjectType, Object)} or {@link #authorise(Operation, ObjectType, ObjectProperties)}.
+ * Default result for {@link #access(org.apache.qpid.server.security.access.ObjectType)} or {@link #authorise(Operation, ObjectType, ObjectProperties)}.
*/
Result getDefault();
/**
* Authorise access granted to an object instance.
*/
- Result access(ObjectType objectType, Object instance);
+ Result access(ObjectType objectType);
/**
* Authorise an operation on an object defined by a set of properties.
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 5af035c6b3..e5911d268b 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -54,7 +54,6 @@ import static org.apache.qpid.server.security.access.Operation.PURGE;
import static org.apache.qpid.server.security.access.Operation.UNBIND;
import static org.apache.qpid.server.security.access.Operation.UPDATE;
-import javax.security.auth.Subject;
import java.net.SocketAddress;
import java.security.AccessControlException;
import java.util.Collection;
@@ -68,9 +67,6 @@ public class SecurityManager implements ConfigurationChangeListener
{
private static final Logger _logger = Logger.getLogger(SecurityManager.class);
- /** Container for the {@link java.security.Principal} that is using to this thread. */
- private static final ThreadLocal<Subject> _subject = new ThreadLocal<Subject>();
-
public static final ThreadLocal<Boolean> _accessChecksDisabled = new ClearingThreadLocal(false);
private ConcurrentHashMap<String, AccessControl> _globalPlugins = new ConcurrentHashMap<String, AccessControl>();
@@ -189,16 +185,6 @@ public class SecurityManager implements ConfigurationChangeListener
return accessControl.getClass().getName();
}
- public static Subject getThreadSubject()
- {
- return _subject.get();
- }
-
- public static void setThreadSubject(final Subject subject)
- {
- _subject.set(subject);
- }
-
public static Logger getLogger()
{
return _logger;
@@ -335,7 +321,7 @@ public class SecurityManager implements ConfigurationChangeListener
{
Result allowed(AccessControl plugin)
{
- return plugin.access(ObjectType.MANAGEMENT, null);
+ return plugin.access(ObjectType.MANAGEMENT);
}
}))
{
@@ -343,13 +329,13 @@ public class SecurityManager implements ConfigurationChangeListener
}
}
- public void accessVirtualhost(final String vhostname, final SocketAddress remoteAddress)
+ public void accessVirtualhost(final String vhostname)
{
if(!checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.access(VIRTUALHOST, remoteAddress);
+ return plugin.access(VIRTUALHOST);
}
}))
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java
index 4e61e4b80b..1a23de0821 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/jmx/JMXPasswordAuthenticator.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.security.auth.jmx;
import java.net.SocketAddress;
+import java.security.PrivilegedAction;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.security.SecurityManager;
@@ -117,15 +118,16 @@ public class JMXPasswordAuthenticator implements JMXAuthenticator
private void doManagementAuthorisation(Subject authenticatedSubject)
{
- SecurityManager.setThreadSubject(authenticatedSubject);
- try
+ Subject.doAs(authenticatedSubject, new PrivilegedAction<Object>()
{
- _broker.getSecurityManager().accessManagement();
- }
- finally
- {
- SecurityManager.setThreadSubject(null);
- }
+ @Override
+ public Object run()
+ {
+ _broker.getSecurityManager().accessManagement();
+ return null;
+ }
+ });
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 31481721d6..feb619f01a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -536,7 +536,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
- public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QueueExistsException
+ public AMQQueue createQueue(Map<String, Object> attributes) throws QueueExistsException
{
// make a copy as we may augment (with an ID for example)
attributes = new LinkedHashMap<String, Object>(attributes);
@@ -585,7 +585,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
- return _queueFactory.createQueue(creatingSession, attributes);
+ return _queueFactory.createQueue(attributes);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 61b2265d89..ab5a406cff 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -59,7 +59,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
int removeQueue(AMQQueue queue);
- AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException;
+ AMQQueue createQueue(Map<String, Object> arguments) throws QueueExistsException;
Exchange createExchange(UUID id,
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
index 9dc4fb768e..a54ba06c53 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.configuration.updater;
+import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.BlockingQueue;
@@ -216,27 +217,32 @@ public class TaskExecutorTest extends TestCase
_executor.start();
LogActor actor = new TestLogActor(new NullRootMessageLogger());
Subject subject = new Subject();
- Subject currentSecurityManagerSubject = SecurityManager.getThreadSubject();
final AtomicReference<LogActor> taskLogActor = new AtomicReference<LogActor>();
final AtomicReference<Subject> taskSubject = new AtomicReference<Subject>();
try
{
CurrentActor.set(actor);
- SecurityManager.setThreadSubject(subject);
- _executor.submitAndWait(new TaskExecutor.Task<Object>()
+ Subject.doAs(subject, new PrivilegedAction<Object>()
{
@Override
- public Void call()
+ public Object run()
+ { _executor.submitAndWait(new TaskExecutor.Task<Object>()
{
- taskLogActor.set(CurrentActor.get());
- taskSubject.set(SecurityManager.getThreadSubject());
+ @Override
+ public Void call()
+ {
+ taskLogActor.set(CurrentActor.get());
+ taskSubject.set(Subject.getSubject(AccessController.getContext()));
+ return null;
+ }
+ });
return null;
}
});
+
}
finally
{
- SecurityManager.setThreadSubject(currentSecurityManagerSubject);
CurrentActor.remove();
}
assertEquals("Unexpected task log actor", actor, taskLogActor.get());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 1358798a38..eeb83a7148 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
+import java.net.SocketAddress;
import java.security.Principal;
import java.util.ArrayList;
import java.util.List;
@@ -456,6 +457,11 @@ public class MockConsumer implements ConsumerTarget
return "remoteAddress:1234";
}
+ public SocketAddress getRemoteAddress()
+ {
+ return null;
+ }
+
@Override
public String getClientId()
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index adb024257d..c900b72ae5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -80,7 +80,7 @@ public class TopicExchangeTest extends QpidTestCase
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
attributes.put(Queue.NAME, name);
- return _vhost.createQueue(null, attributes);
+ return _vhost.createQueue(attributes);
}
public void testNoRoute() throws Exception
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 4b6a51f84b..5f1e88dba9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -97,13 +96,13 @@ public class AMQQueueFactoryTest extends QpidTestCase
final ArgumentCaptor<Map> attributes = ArgumentCaptor.forClass(Map.class);
- when(_virtualHost.createQueue(any(AMQSessionModel.class), attributes.capture())).then(
+ when(_virtualHost.createQueue(attributes.capture())).then(
new Answer<AMQQueue>()
{
@Override
public AMQQueue answer(InvocationOnMock invocation) throws Throwable
{
- return _queueFactory.createQueue(null, attributes.getValue());
+ return _queueFactory.createQueue(attributes.getValue());
}
}
);
@@ -206,7 +205,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.PRIORITIES, 5);
- AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ AMQQueue queue = _queueFactory.createQueue(attributes);
assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -224,7 +223,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.NAME, queueName);
- AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ AMQQueue queue = _queueFactory.createQueue(attributes);
assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
@@ -256,7 +255,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
- AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ AMQQueue queue = _queueFactory.createQueue(attributes);
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -297,7 +296,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ AMQQueue queue = _queueFactory.createQueue(attributes);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
@@ -338,7 +337,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.CREATE_DLQ_ON_CREATION, false);
- AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ AMQQueue queue = _queueFactory.createQueue(attributes);
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
@@ -372,7 +371,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
//create an autodelete queue
- AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ AMQQueue queue = _queueFactory.createQueue(attributes);
assertEquals("Queue should be autodelete",
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
queue.getLifetimePolicy());
@@ -398,7 +397,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ final AMQQueue queue = _queueFactory.createQueue(attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
@@ -416,7 +415,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
- final AMQQueue queue = _queueFactory.createQueue(null, attributes);
+ final AMQQueue queue = _queueFactory.createQueue(attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
@@ -434,7 +433,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
- _queueFactory.createQueue(null, attributes);
+ _queueFactory.createQueue(attributes);
fail("queue with null name can not be created!");
}
catch (Exception e)
@@ -463,7 +462,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
- _queueFactory.createQueue(null, attributes);
+ _queueFactory.createQueue(attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -493,7 +492,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
- _queueFactory.createQueue(null, attributes);
+ _queueFactory.createQueue(attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
}
catch (Exception e)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
index 42b83d8c47..e51b575895 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
@@ -52,7 +52,7 @@ public class ConflationQueueListTest extends TestCase
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY);
- _queue = new ConflationQueue(mock(VirtualHost.class), null, queueAttributes);
+ _queue = new ConflationQueue(mock(VirtualHost.class), queueAttributes);
_list = _queue.getEntries();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index 0ac4eb8f78..8b2ee70b26 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -51,7 +51,7 @@ public class PriorityQueueListTest extends QpidTestCase
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.PRIORITIES, 10);
- PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), null, queueAttributes);
+ PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), queueAttributes);
_list = queue.getEntries();
for (int i = 0; i < PRIORITIES.length; i++)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 139e32420f..dc91be9cdb 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -204,7 +204,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
+ StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
index c74c7bfa8b..8f4ebf1b2e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
@@ -78,7 +78,7 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
- _queue = (Q) _virtualHost.createQueue(null, attributes);
+ _queue = (Q) _virtualHost.createQueue(attributes);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
}
@@ -106,7 +106,7 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends
Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
- _queue = (Q) _virtualHost.createQueue(null, attributes);
+ _queue = (Q) _virtualHost.createQueue(attributes);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -118,7 +118,7 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends
Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
attributes.put(Queue.NAME, "differentName");
- _queue = (Q) _virtualHost.createQueue(null, attributes);
+ _queue = (Q) _virtualHost.createQueue(attributes);
assertNotNull("Queue was not created", _queue);
}
@@ -1105,7 +1105,7 @@ abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends
{
public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost)
{
- super(vhost, null, attributes(), factory);
+ super(vhost, attributes(), factory);
}
private static Map<String,Object> attributes()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 286c47fdf6..1d4bfa9960 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -57,7 +57,7 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
attributes.put(Queue.NAME, "test");
- AMQQueue queue = test.createQueue(null, attributes);
+ AMQQueue queue = test.createQueue(attributes);
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 7aa546e1b3..9813ab98b5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -44,7 +44,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest");
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
+ StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
queueEntryList = queue.getEntries();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index cfa58f11c4..be2948c18d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -86,7 +86,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueue
attributes.put(Queue.SORT_KEY, "KEY");
// Create test list
- _testQueue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ _testQueue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index e55875135a..2f1122fa9a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -54,7 +54,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
attributes.put(Queue.SORT_KEY, "KEY");
- SortedQueue queue = new SortedQueue(mock(VirtualHost.class), null, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ SortedQueue queue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index fe9273dbc8..5da991b8dd 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -50,7 +50,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- _testQueue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
+ _testQueue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
_sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
@@ -93,7 +93,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQ
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
+ StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
return queue.getEntries();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 15e0303e61..4dd0423c27 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -52,7 +52,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry
queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
queueAttributes.put(Queue.OWNER, "testOwner");
- setQueue(new StandardQueue(null, null, queueAttributes));
+ setQueue(new StandardQueue(null, queueAttributes));
assertNull("Queue was created", getQueue());
}
catch (IllegalArgumentException e)
@@ -70,7 +70,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getQname());
queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
- final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes);
+ final StandardQueue queue = new StandardQueue(getVirtualHost(), queueAttributes);
setQueue(queue);
@@ -93,7 +93,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
queueAttributes.put(Queue.OWNER, "testOwner");
- final StandardQueue queue = new StandardQueue(getVirtualHost(), null, queueAttributes);
+ final StandardQueue queue = new StandardQueue(getVirtualHost(), queueAttributes);
//verify adding an active consumer increases the count
final MockConsumer consumer1 = new MockConsumer();
@@ -200,7 +200,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "test");
// create queue with overridden method deliverAsync
- StandardQueue testQueue = new StandardQueue(getVirtualHost(), null, queueAttributes)
+ StandardQueue testQueue = new StandardQueue(getVirtualHost(), queueAttributes)
{
@Override
public void deliverAsync(QueueConsumer sub)
@@ -269,7 +269,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry
public DequeuedQueue(VirtualHost virtualHost)
{
- super(virtualHost, null, attributes(), new DequeuedQueueEntryListFactory());
+ super(virtualHost, attributes(), new DequeuedQueueEntryListFactory());
}
private static Map<String,Object> attributes()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/TestPrincipalUtils.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/TestPrincipalUtils.java
index ea6b40e3de..f5bac17843 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/TestPrincipalUtils.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/TestPrincipalUtils.java
@@ -43,7 +43,7 @@ public class TestPrincipalUtils
principals.add(new GroupPrincipal(group));
}
- return new Subject(true, principals, Collections.EMPTY_SET, Collections.EMPTY_SET);
+ return new Subject(false, principals, Collections.EMPTY_SET, Collections.EMPTY_SET);
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 641fc7cb35..8f57c52299 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -186,7 +186,7 @@ public class BrokerTestHelper
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
attributes.put(Queue.NAME, queueName);
- AMQQueue queue = virtualHost.createQueue(null, attributes);
+ AMQQueue queue = virtualHost.createQueue(attributes);
return queue;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index cee7edf1b7..e68d5bb2ba 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -153,7 +152,7 @@ public class MockVirtualHost implements VirtualHost
}
@Override
- public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments)
+ public AMQQueue createQueue(Map<String, Object> arguments)
{
return null;
}
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
index 6beeef2f18..75006ae697 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.security.access.plugins;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.io.File;
+import java.net.SocketAddress;
+import java.security.AccessController;
+import java.util.Set;
import javax.security.auth.Subject;
@@ -30,8 +33,8 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.ObjectUtils;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.security.Result;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectType;
@@ -115,15 +118,22 @@ public class DefaultAccessControl implements AccessControl
* Delegate to the {@link #authorise(Operation, ObjectType, ObjectProperties)} method, with
* the operation set to ACCESS and no object properties.
*/
- public Result access(ObjectType objectType, Object inetSocketAddress)
+ public Result access(ObjectType objectType)
{
InetAddress addressOfClient = null;
-
- if(inetSocketAddress != null)
+ final Subject subject = Subject.getSubject(AccessController.getContext());
+ if(subject != null)
{
- addressOfClient = ((InetSocketAddress) inetSocketAddress).getAddress();
+ Set<ConnectionPrincipal> principals = subject.getPrincipals(ConnectionPrincipal.class);
+ if(!principals.isEmpty())
+ {
+ SocketAddress address = principals.iterator().next().getConnection().getRemoteAddress();
+ if(address instanceof InetSocketAddress)
+ {
+ addressOfClient = ((InetSocketAddress) address).getAddress();
+ }
+ }
}
-
return authoriseFromAddress(Operation.ACCESS, objectType, ObjectProperties.EMPTY, addressOfClient);
}
@@ -139,7 +149,7 @@ public class DefaultAccessControl implements AccessControl
public Result authoriseFromAddress(Operation operation, ObjectType objectType, ObjectProperties properties, InetAddress addressOfClient)
{
- final Subject subject = SecurityManager.getThreadSubject();
+ final Subject subject = Subject.getSubject(AccessController.getContext());
// Abstain if there is no subject/principal associated with this thread
if (subject == null || subject.getPrincipals().size() == 0)
{
diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
index ea8e469edb..8ac4e0c424 100644
--- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
+++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
@@ -24,17 +24,20 @@ import static org.mockito.Mockito.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.Result;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.server.security.access.Operation;
@@ -64,7 +67,6 @@ public class DefaultAccessControlTest extends TestCase
private void configureAccessControl(final RuleSet rs) throws ConfigurationException
{
_plugin = new DefaultAccessControl(rs);
- SecurityManager.setThreadSubject(null);
CurrentActor.set(new TestLogActor(messageLogger));
}
@@ -86,7 +88,6 @@ public class DefaultAccessControlTest extends TestCase
protected void tearDown() throws Exception
{
super.tearDown();
- SecurityManager.setThreadSubject(null);
}
/**
@@ -95,8 +96,6 @@ public class DefaultAccessControlTest extends TestCase
public void testNoSubjectAlwaysAbstains() throws ConfigurationException
{
setUpGroupAccessControl();
- SecurityManager.setThreadSubject(null);
-
final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
assertEquals(Result.ABSTAIN, result);
}
@@ -108,10 +107,16 @@ public class DefaultAccessControlTest extends TestCase
public void testUsernameAllowsOperation() throws ConfigurationException
{
setUpGroupAccessControl();
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user1"));
-
- final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
- assertEquals(Result.ALLOWED, result);
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user1"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
+ assertEquals(Result.ALLOWED, result);
+ return null;
+ }
+ });
}
/**
@@ -143,14 +148,22 @@ public class DefaultAccessControlTest extends TestCase
public void testCatchAllRuleDeniesUnrecognisedUsername() throws ConfigurationException
{
setUpGroupAccessControl();
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("unknown", "unkgroup1", "unkgroup2"));
-
- assertEquals("Expecting zero messages before test", 0, messageLogger.getLogMessages().size());
- final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
- assertEquals(Result.DENIED, result);
+ Subject.doAs(TestPrincipalUtils.createTestSubject("unknown", "unkgroup1", "unkgroup2"),
+ new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ assertEquals("Expecting zero messages before test", 0, messageLogger.getLogMessages().size());
+ final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
+ assertEquals(Result.DENIED, result);
+
+ assertEquals("Expecting one message before test", 1, messageLogger.getLogMessages().size());
+ assertTrue("Logged message does not contain expected string", messageLogger.messageContains(0, "ACL-1002"));
+ return null;
+ }
+ });
- assertEquals("Expecting one message before test", 1, messageLogger.getLogMessages().size());
- assertTrue("Logged message does not contain expected string", messageLogger.messageContains(0, "ACL-1002"));
}
/**
@@ -163,13 +176,20 @@ public class DefaultAccessControlTest extends TestCase
// grant user4 access right on any method in any component
rs.grant(1, "user4", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, new ObjectProperties(ObjectProperties.STAR));
configureAccessControl(rs);
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user4"));
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user4"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ ObjectProperties actionProperties = new ObjectProperties("getName");
+ actionProperties.put(ObjectProperties.Property.COMPONENT, "Test");
+
+ final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties);
+ assertEquals(Result.ALLOWED, result);
+ return null;
+ }
+ });
- ObjectProperties actionProperties = new ObjectProperties("getName");
- actionProperties.put(ObjectProperties.Property.COMPONENT, "Test");
-
- final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties);
- assertEquals(Result.ALLOWED, result);
}
/**
@@ -184,55 +204,91 @@ public class DefaultAccessControlTest extends TestCase
ruleProperties.put(ObjectProperties.Property.COMPONENT, "Test");
rs.grant(1, "user5", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, ruleProperties);
configureAccessControl(rs);
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user5"));
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user5"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ ObjectProperties actionProperties = new ObjectProperties("getName");
+ actionProperties.put(ObjectProperties.Property.COMPONENT, "Test");
+ Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties);
+ assertEquals(Result.ALLOWED, result);
+
+ actionProperties.put(ObjectProperties.Property.COMPONENT, "Test2");
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties);
+ assertEquals(Result.DEFER, result);
+ return null;
+ }
+ });
- ObjectProperties actionProperties = new ObjectProperties("getName");
- actionProperties.put(ObjectProperties.Property.COMPONENT, "Test");
- Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties);
- assertEquals(Result.ALLOWED, result);
- actionProperties.put(ObjectProperties.Property.COMPONENT, "Test2");
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties);
- assertEquals(Result.DEFER, result);
}
public void testAccess() throws Exception
{
- Subject subject = TestPrincipalUtils.createTestSubject("user1");
- SecurityManager.setThreadSubject(subject);
+ final Subject subject = TestPrincipalUtils.createTestSubject("user1");
+ final InetAddress inetAddress = InetAddress.getLocalHost();
+ final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
+
+ AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
+ when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress);
+
+ subject.getPrincipals().add(new ConnectionPrincipal(connectionModel));
+
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
+ {
+ @Override
+ public Object run() throws Exception
+ {
+ RuleSet mockRuleSet = mock(RuleSet.class);
+
- RuleSet mockRuleSet = mock(RuleSet.class);
- InetAddress inetAddress = InetAddress.getLocalHost();
- InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
+ DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet);
- DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet);
+ accessControl.access(ObjectType.VIRTUALHOST);
- accessControl.access(ObjectType.VIRTUALHOST, inetSocketAddress);
+ verify(mockRuleSet).check(subject, Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY, inetAddress);
+ return null;
+ }
+ });
- verify(mockRuleSet).check(subject, Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY, inetAddress);
}
public void testAccessIsDeniedIfRuleThrowsException() throws Exception
{
- Subject subject = TestPrincipalUtils.createTestSubject("user1");
- SecurityManager.setThreadSubject(subject);
+ final Subject subject = TestPrincipalUtils.createTestSubject("user1");
+ final InetAddress inetAddress = InetAddress.getLocalHost();
+ final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
- InetAddress inetAddress = InetAddress.getLocalHost();
- InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
+ AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
+ when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress);
- RuleSet mockRuleSet = mock(RuleSet.class);
- when(mockRuleSet.check(
- subject,
- Operation.ACCESS,
- ObjectType.VIRTUALHOST,
- ObjectProperties.EMPTY,
- inetAddress)).thenThrow(new RuntimeException());
+ subject.getPrincipals().add(new ConnectionPrincipal(connectionModel));
- DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet);
- Result result = accessControl.access(ObjectType.VIRTUALHOST, inetSocketAddress);
+ Subject.doAs(subject, new PrivilegedExceptionAction<Object>()
+ {
+ @Override
+ public Object run() throws Exception
+ {
+
+
+ RuleSet mockRuleSet = mock(RuleSet.class);
+ when(mockRuleSet.check(
+ subject,
+ Operation.ACCESS,
+ ObjectType.VIRTUALHOST,
+ ObjectProperties.EMPTY,
+ inetAddress)).thenThrow(new RuntimeException());
+
+ DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet);
+ Result result = accessControl.access(ObjectType.VIRTUALHOST);
+
+ assertEquals(Result.DENIED, result);
+ return null;
+ }
+ });
- assertEquals(Result.DENIED, result);
}
@@ -248,21 +304,29 @@ public class DefaultAccessControlTest extends TestCase
ruleProperties.put(ObjectProperties.Property.COMPONENT, "Test");
rs.grant(1, "user6", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, ruleProperties);
configureAccessControl(rs);
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user6"));
-
- ObjectProperties properties = new ObjectProperties("getAttribute");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user6"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ ObjectProperties properties = new ObjectProperties("getAttribute");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
+
+ properties.put(ObjectProperties.Property.COMPONENT, "Test2");
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.DEFER, result);
+
+ properties = new ObjectProperties("getAttribute2");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.DEFER, result);
+
+ return null;
+ }
+ });
- properties.put(ObjectProperties.Property.COMPONENT, "Test2");
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.DEFER, result);
-
- properties = new ObjectProperties("getAttribute2");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.DEFER, result);
}
/**
@@ -275,25 +339,33 @@ public class DefaultAccessControlTest extends TestCase
// grant user8 all rights on method queryNames in all component
rs.grant(1, "user8", Permission.ALLOW, Operation.ALL, ObjectType.METHOD, new ObjectProperties("queryNames"));
configureAccessControl(rs);
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user8"));
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user8"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ ObjectProperties properties = new ObjectProperties();
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ properties.put(ObjectProperties.Property.NAME, "queryNames");
+
+ Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
- ObjectProperties properties = new ObjectProperties();
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- properties.put(ObjectProperties.Property.NAME, "queryNames");
+ result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
- Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
+ properties = new ObjectProperties("getAttribute");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
+ assertEquals(Result.DEFER, result);
- result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.DEFER, result);
+ return null;
+ }
+ });
- properties = new ObjectProperties("getAttribute");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
- assertEquals(Result.DEFER, result);
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.DEFER, result);
}
/**
@@ -306,24 +378,32 @@ public class DefaultAccessControlTest extends TestCase
// grant user9 all rights on any method in all component
rs.grant(1, "user9", Permission.ALLOW, Operation.ALL, ObjectType.METHOD, new ObjectProperties());
configureAccessControl(rs);
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user9"));
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user9"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ ObjectProperties properties = new ObjectProperties("queryNames");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
- ObjectProperties properties = new ObjectProperties("queryNames");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
- Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
+ result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
- result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
+ properties = new ObjectProperties("getAttribute");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
+
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
+ return null;
+ }
+ });
- properties = new ObjectProperties("getAttribute");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
}
/**
@@ -340,29 +420,43 @@ public class DefaultAccessControlTest extends TestCase
rs.grant(1, "user9", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, ruleProperties);
configureAccessControl(rs);
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user9"));
-
- ObjectProperties properties = new ObjectProperties("getAttributes");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
-
- properties = new ObjectProperties("getAttribute");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.ALLOWED, result);
-
- properties = new ObjectProperties("getAttribut");
- properties.put(ObjectProperties.Property.COMPONENT, "Test");
- result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
- assertEquals(Result.DEFER, result);
+ Subject.doAs(TestPrincipalUtils.createTestSubject("user9"), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ ObjectProperties properties = new ObjectProperties("getAttributes");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
+
+ properties = new ObjectProperties("getAttribute");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.ALLOWED, result);
+
+ properties = new ObjectProperties("getAttribut");
+ properties.put(ObjectProperties.Property.COMPONENT, "Test");
+ result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties);
+ assertEquals(Result.DEFER, result);
+ return null;
+ }
+ });
}
- private void authoriseAndAssertResult(Result expectedResult, String userName, String... groups)
+ private void authoriseAndAssertResult(final Result expectedResult, String userName, String... groups)
{
- SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject(userName, groups));
- Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
- assertEquals(expectedResult, result);
+ Subject.doAs(TestPrincipalUtils.createTestSubject(userName, groups), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY);
+ assertEquals(expectedResult, result);
+ return null;
+ }
+ });
+
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 9c012eb782..024169326c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import java.net.SocketAddress;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
@@ -65,7 +67,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor;
- private Subject _authorizedSubject = null;
+ private final Subject _authorizedSubject = new Subject();
private Principal _authorizedPrincipal = null;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
@@ -84,6 +86,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
public ServerConnection(final long connectionId, Broker broker)
{
_connectionId = connectionId;
+ _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
_actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
}
@@ -249,29 +252,43 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
- public void received(ProtocolEvent event)
+ public void received(final ProtocolEvent event)
{
_lastIoTime.set(System.currentTimeMillis());
+ Subject subject;
if (event.isConnectionControl())
{
CurrentActor.set(_actor);
+ subject = _authorizedSubject;
}
else
{
ServerSession channel = (ServerSession) getSession(event.getChannel());
LogActor channelActor = null;
-
if (channel != null)
{
+ subject = channel.getAuthorizedSubject();
channelActor = channel.getLogActor();
}
+ else
+ {
+ subject = _authorizedSubject;
+ }
CurrentActor.set(channelActor == null ? _actor : channelActor);
}
try
{
- super.received(event);
+ Subject.doAs(subject, new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ ServerConnection.super.received(event);
+ return null;
+ }
+ });
}
finally
{
@@ -461,12 +478,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
if (authorizedSubject == null)
{
- _authorizedSubject = null;
_authorizedPrincipal = null;
}
else
{
- _authorizedSubject = authorizedSubject;
+ _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
+
_authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 13c7e7bcd3..ce059db703 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -38,7 +38,6 @@ import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -190,7 +189,7 @@ public class ServerConnectionDelegate extends ServerDelegate
}
vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName);
- SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
+
if(vhost != null)
{
@@ -198,7 +197,7 @@ public class ServerConnectionDelegate extends ServerDelegate
try
{
- vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress());
+ vhost.getSecurityManager().accessVirtualhost(vhostName);
}
catch (AccessControlException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 29f9fc549e..12c10240bb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+
+import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
@@ -100,6 +102,7 @@ public class ServerSession extends Session
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final UUID _id = UUID.randomUUID();
+ private final Subject _subject = new Subject();
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
@@ -147,6 +150,9 @@ public class ServerSession extends Session
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
+ _subject.getPrincipals().addAll(((ServerConnection) connection).getAuthorizedSubject().getPrincipals());
+ _subject.getPrincipals().add(new SessionPrincipal(this));
+
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
{
@Override
@@ -610,7 +616,7 @@ public class ServerSession extends Session
public Subject getAuthorizedSubject()
{
- return getConnection().getAuthorizedSubject();
+ return _subject;
}
public void addDeleteTask(Action<? super ServerSession> task)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d39ca73136..5d36aa5321 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -61,6 +60,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -89,8 +89,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
try
{
- setThreadSubject(session);
-
if(!session.isClosing())
{
Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark();
@@ -117,6 +115,10 @@ public class ServerSessionDelegate extends SessionDelegate
{
LOGGER.error("Exception processing command", e);
exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e);
+ if(e instanceof ServerScopedRuntimeException)
+ {
+ throw e;
+ }
}
}
@@ -1222,7 +1224,7 @@ public class ServerSessionDelegate extends SessionDelegate
arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
- queue = virtualHost.createQueue((ServerSession)session, arguments);
+ queue = virtualHost.createQueue(arguments);
}
catch(QueueExistsException qe)
@@ -1437,7 +1439,6 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void closed(Session session)
{
- setThreadSubject(session);
ServerSession serverSession = (ServerSession)session;
serverSession.stopSubscriptions();
@@ -1451,12 +1452,6 @@ public class ServerSessionDelegate extends SessionDelegate
closed(session);
}
- private void setThreadSubject(Session session)
- {
- final ServerConnection scon = (ServerConnection) session.getConnection();
- SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
- }
-
private static class CommandProcessedAction implements ServerTransaction.Action
{
private final ServerSession _serverSession;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 0146d066f1..453f3035e1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -87,6 +88,8 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
+import javax.security.auth.Subject;
+
public class AMQChannel<T extends AMQProtocolSession<T>>
implements AMQSessionModel<AMQChannel<T>,T>,
AsyncAutoCommitTransaction.FutureRecorder
@@ -172,6 +175,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
+ private Subject _subject;
public AMQChannel(T session, int channelId, MessageStore messageStore)
@@ -181,6 +185,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_channelId = channelId;
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
+ _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
+ session.getAuthorizedSubject().getPublicCredentials(),
+ session.getAuthorizedSubject().getPrivateCredentials());
+ _subject.getPrincipals().add(new SessionPrincipal(this));
_logSubject = new ChannelLogSubject(this);
_actor.message(ChannelMessages.CREATE());
@@ -1253,6 +1261,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_taskList.remove(task);
}
+ public Subject getSubject()
+ {
+ return _subject;
+ }
+
private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 68f1ad7942..bd2dc9413c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,6 +54,7 @@ import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -135,7 +137,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
- private Subject _authorizedSubject;
+ private final Subject _authorizedSubject = new Subject();
private MethodDispatcher _dispatcher;
private final long _connectionID;
@@ -187,6 +189,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_connectionID = connectionId;
_actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger());
+ _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
_logSubject = new ConnectionLogSubject(this);
@@ -247,62 +250,71 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void received(final ByteBuffer msg)
{
- final long arrivalTime = System.currentTimeMillis();
- _lastReceivedTime = arrivalTime;
- _lastIoTime = arrivalTime;
- _readBytes += msg.remaining();
-
- _receivedLock.lock();
- try
+ Subject.doAs(_authorizedSubject, new PrivilegedAction<Void>()
{
- final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ @Override
+ public Void run()
{
+ final long arrivalTime = System.currentTimeMillis();
+ _lastReceivedTime = arrivalTime;
+ _lastIoTime = arrivalTime;
+ _readBytes += msg.remaining();
+
+ _receivedLock.lock();
try
{
- dataBlockReceived(dataBlock);
- }
- catch(AMQConnectionException e)
- {
- if(_logger.isDebugEnabled())
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch(AMQConnectionException e)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
+ }
+ break;
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ break;
+ }
}
- break;
+ receivedComplete();
}
- catch (Exception e)
+ catch (ConnectionScopedRuntimeException e)
{
- _logger.error("Unexpected exception when processing datablock", e);
+ _logger.error("Unexpected exception", e);
closeProtocolSession();
- break;
}
+ catch (AMQProtocolVersionException e)
+ {
+ _logger.error("Unexpected protocol version", e);
+ closeProtocolSession();
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ _logger.error("Frame decoding", e);
+ closeProtocolSession();
+ }
+ catch (IOException e)
+ {
+ _logger.error("I/O Exception", e);
+ closeProtocolSession();
+ }
+ finally
+ {
+ _receivedLock.unlock();
+ }
+ return null;
}
- receivedComplete();
- }
- catch (ConnectionScopedRuntimeException e)
- {
- _logger.error("Unexpected exception", e);
- closeProtocolSession();
- }
- catch (AMQProtocolVersionException e)
- {
- _logger.error("Unexpected protocol version", e);
- closeProtocolSession();
- }
- catch (AMQFrameDecodingException e)
- {
- _logger.error("Frame decoding", e);
- closeProtocolSession();
- }
- catch (IOException e)
- {
- _logger.error("I/O Exception", e);
- closeProtocolSession();
- }
- finally
- {
- _receivedLock.unlock();
- }
+ });
+
}
private void receivedComplete()
@@ -1126,7 +1138,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
throw new IllegalArgumentException("authorizedSubject cannot be null");
}
- _authorizedSubject = authorizedSubject;
+
+ _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
+
}
public Subject getAuthorizedSubject()
@@ -1136,7 +1150,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public Principal getAuthorizedPrincipal()
{
- return _authorizedSubject == null ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_authorizedSubject);
+
+ return _authorizedSubject.getPrincipals(AuthenticatedPrincipal.class).size() == 0 ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_authorizedSubject);
}
public SocketAddress getRemoteAddress()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
index 1c992012da..582f951342 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
@@ -83,7 +83,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
// Check virtualhost access
try
{
- virtualHost.getSecurityManager().accessVirtualhost(virtualHostName, session.getRemoteAddress());
+ virtualHost.getSecurityManager().accessVirtualhost(virtualHostName);
}
catch (AccessControlException e)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 26302dbd72..4a3bd1e921 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -215,7 +215,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
- final AMQQueue queue = virtualHost.createQueue(channel, attributes);
+ final AMQQueue queue = virtualHost.createQueue(attributes);
return queue;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
index c3cbb5e26f..93c7c2e778 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
@@ -32,11 +32,17 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import javax.security.auth.Subject;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CopyOnWriteArraySet;
/**
@@ -85,12 +91,13 @@ public class AMQStateManager implements AMQMethodListener
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
- MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
+ final MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
final int channelId = evt.getChannelId();
- B body = evt.getMethod();
+ final B body = evt.getMethod();
- if(channelId != 0 && _protocolSession.getChannel(channelId)== null)
+ final AMQChannel channel = _protocolSession.getChannel(channelId);
+ if(channelId != 0 && channel == null)
{
if(! ((body instanceof ChannelOpenBody)
@@ -101,8 +108,37 @@ public class AMQStateManager implements AMQMethodListener
}
}
+ if(channel == null)
+ {
+ return body.execute(dispatcher, channelId);
+ }
+ else
+ {
+ try
+ {
+ return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>()
+ {
+ @Override
+ public Boolean run() throws AMQException
+ {
+ return body.execute(dispatcher, channelId);
+ }
+ });
+ }
+ catch (PrivilegedActionException e)
+ {
+ if(e.getCause() instanceof AMQException)
+ {
+ throw (AMQException) e.getCause();
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(e.getCause());
+ }
+ }
- return body.execute(dispatcher, channelId);
+
+ }
}
@@ -113,7 +149,6 @@ public class AMQStateManager implements AMQMethodListener
public AMQProtocolSession getProtocolSession()
{
- SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject());
return _protocolSession;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index cd246c081f..5e38442d7e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,28 +20,38 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.net.InetAddress;
+import java.net.SocketAddress;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.Collection;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.security.auth.Subject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
@@ -50,12 +60,14 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final Port _port;
private final Broker _broker;
+ private final SubjectCreator _subjectCreator;
private VirtualHost _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
private final long _connectionId;
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
+ private final Subject _subject = new Subject();
private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
@@ -91,13 +103,15 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
ConnectionEndpoint conn,
long connectionId,
Port port,
- Transport transport)
+ Transport transport, final SubjectCreator subjectCreator)
{
_broker = broker;
_port = port;
_transport = transport;
_conn = conn;
_connectionId = connectionId;
+ _subject.getPrincipals().add(new ConnectionPrincipal(this));
+ _subjectCreator = subjectCreator;
//_vhost.getConnectionRegistry().registerConnection(this);
}
@@ -109,9 +123,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
+ final Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
_sessions.add(session);
- endpoint.setSessionEventListener(session);
+ endpoint.setSessionEventListener(new SessionEventListener()
+ {
+ @Override
+ public void remoteLinkCreation(final LinkEndpoint endpoint)
+ {
+ Subject.doAs(session.getSubject(),new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteLinkCreation(endpoint);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteEnd(final End end)
+ {
+ Subject.doAs(session.getSubject(),new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteEnd(end);
+ return null;
+ }
+ });
+ }
+ });
}
void sessionEnded(Session_1_0 session)
@@ -209,6 +252,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
return String.valueOf(_conn.getRemoteAddress());
}
+ public SocketAddress getRemoteAddress()
+ {
+ return _conn.getRemoteAddress();
+ }
+
@Override
public String getClientId()
{
@@ -235,7 +283,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
public Principal getAuthorizedPrincipal()
{
- return _conn.getUser();
+ Set<AuthenticatedPrincipal> authPrincipals = _subject.getPrincipals(AuthenticatedPrincipal.class);
+ return authPrincipals.isEmpty() ? null : authPrincipals.iterator().next();
}
@Override
@@ -365,7 +414,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
err.setDescription("Unknown hostname " + _conn.getLocalHostname());
_conn.close(err);
}
+ Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser());
+ _subject.getPrincipals().addAll(authSubject.getPrincipals());
+ _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
+ _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
}
}
+
+ Subject getSubject()
+ {
+ return _subject;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index aa52c6191a..468ba2758e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -25,10 +25,12 @@ import java.io.PrintWriter;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
@@ -194,7 +196,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_endpoint.setProperties(serverProperties);
_endpoint.setRemoteAddress(getRemoteAddress());
- _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport);
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
_endpoint.setConnectionEventListener(_connection);
_endpoint.setFrameOutputHandler(this);
_endpoint.setSaslFrameOutput(this);
@@ -256,7 +258,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private final Logger RAW_LOGGER = Logger.getLogger("RAW");
- public synchronized void received(ByteBuffer msg)
+ public synchronized void received(final ByteBuffer msg)
{
try
{
@@ -357,8 +359,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
case FRAME:
if (msg.hasRemaining())
{
- _frameHandler = _frameHandler.parse(msg);
- _connection.frameReceived();
+ Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ _frameHandler = _frameHandler.parse(msg);
+ _connection.frameReceived();
+ return null;
+ }
+ });
+
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index d0e77b4878..50c0693f31 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -216,7 +216,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK);
- queue = _vhost.createQueue(getSession(), attributes);
+ queue = _vhost.createQueue(attributes);
}
else
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index b96738e0f6..dcf9e5d7a1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -21,12 +21,15 @@
package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.*;
@@ -39,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.model.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
+import javax.security.auth.Subject;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,6 +83,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private final Connection_1_0 _connection;
private UUID _id = UUID.randomUUID();
private AtomicBoolean _closed = new AtomicBoolean();
+ private final Subject _subject = new Subject();
+
public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -86,13 +93,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
_endpoint = endpoint;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
-
+ _subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
+ _subject.getPrincipals().add(new SessionPrincipal(this));
}
public void remoteLinkCreation(final LinkEndpoint endpoint)
{
-
Destination destination;
Link_1_0 link = null;
Error error = null;
@@ -105,7 +112,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
if(endpoint.getRole() == Role.SENDER)
{
- SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+ final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
if(previousLink == null)
{
@@ -161,7 +168,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
_vhost,
(SendingDestination) destination
);
- sendingLinkEndpoint.setLinkEventListener(sendingLink);
+
+ sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
+
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
{
@@ -194,7 +203,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
endpoint.setSource(oldSource);
SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
- sendingLinkEndpoint.setLinkEventListener(previousLink);
+ sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(previousLink));
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
@@ -237,7 +246,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorLink_1_0 coordinatorLink =
new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
- receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
+ receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink));
link = coordinatorLink;
@@ -296,7 +305,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
(ReceivingDestination) destination);
- receivingLinkEndpoint.setLinkEventListener(receivingLink);
+
+ receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
+
link = receivingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
{
@@ -377,7 +388,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
// TODO convert AMQP 1-0 node properties to queue attributes
- final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes );
+ final AMQQueue tempQueue = queue = _vhost.createQueue(attributes );
}
catch (AccessControlException e)
{
@@ -640,4 +651,86 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
{
_taskList.remove(task);
}
+
+ public Subject getSubject()
+ {
+ return _subject;
+ }
+
+
+ private class SubjectSpecificReceivingLinkListener implements ReceivingLinkListener
+ {
+ private final ReceivingLinkListener _linkListener;
+
+ public SubjectSpecificReceivingLinkListener(final ReceivingLinkListener linkListener)
+ {
+ _linkListener = linkListener;
+ }
+
+ @Override
+ public void messageTransfer(final Transfer xfr)
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _linkListener.messageTransfer(xfr);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _linkListener.remoteDetached(endpoint, detach);
+ return null;
+ }
+ });
+ }
+ }
+
+ private class SubjectSpecificSendingLinkListener implements SendingLinkListener
+ {
+ private final SendingLink_1_0 _previousLink;
+
+ public SubjectSpecificSendingLinkListener(final SendingLink_1_0 previousLink)
+ {
+ _previousLink = previousLink;
+ }
+
+ @Override
+ public void flowStateChanged()
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _previousLink.flowStateChanged();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _previousLink.remoteDetached(endpoint, detach);
+ return null;
+ }
+ });
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
index 674ff71232..0efb76a241 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.AccessControlException;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.security.cert.X509Certificate;
@@ -119,37 +120,22 @@ public class HttpManagementUtil
public static void assertManagementAccess(final SecurityManager securityManager, Subject subject, LogActor actor)
{
// TODO: We should eliminate SecurityManager.setThreadSubject in favour of Subject.doAs
- SecurityManager.setThreadSubject(subject); // Required for accessManagement check
CurrentActor.set(actor);
try
{
- try
+ Subject.doAs(subject, new PrivilegedAction<Void>()
{
- Subject.doAs(subject, new PrivilegedExceptionAction<Void>()
+ @Override
+ public Void run()
{
- @Override
- public Void run()
- {
- securityManager.accessManagement();
- return null;
- }
- });
- }
- catch (PrivilegedActionException e)
- {
- throw new ServerScopedRuntimeException("Unable to perform access check", e);
- }
+ securityManager.accessManagement();
+ return null;
+ }
+ });
}
finally
{
- try
- {
- CurrentActor.remove();
- }
- finally
- {
- SecurityManager.setThreadSubject(null);
- }
+ CurrentActor.remove();
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
index 0381b711bc..1a13733ff5 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
@@ -198,43 +198,36 @@ public abstract class AbstractServlet extends HttpServlet
return;
}
- SecurityManager.setThreadSubject(subject);
+ HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker);
+ CurrentActor.set(logActor);
try
{
- HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker);
- CurrentActor.set(logActor);
- try
- {
- Subject.doAs(subject, privilegedExceptionAction);
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Unable to perform action", e);
- throw e;
- }
- catch (PrivilegedActionException e)
+ Subject.doAs(subject, privilegedExceptionAction);
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Unable to perform action", e);
+ throw e;
+ }
+ catch (PrivilegedActionException e)
+ {
+ LOGGER.error("Unable to perform action", e);
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
{
- LOGGER.error("Unable to perform action", e);
- Throwable cause = e.getCause();
- if(cause instanceof RuntimeException)
- {
- throw (RuntimeException)cause;
- }
- if(cause instanceof Error)
- {
- throw (Error)cause;
- }
- throw new ConnectionScopedRuntimeException(e.getCause());
+ throw (RuntimeException)cause;
}
- finally
+ if(cause instanceof Error)
{
- CurrentActor.remove();
+ throw (Error)cause;
}
+ throw new ConnectionScopedRuntimeException(e.getCause());
}
finally
{
- SecurityManager.setThreadSubject(null);
+ CurrentActor.remove();
}
+
}
protected Subject getAuthorisedSubject(HttpServletRequest request)
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
index e9716ab775..e78e59b7fc 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java
@@ -162,7 +162,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler
}
// Save the subject
- SecurityManager.setThreadSubject(subject);
CurrentActor.set(_logActor);
try
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 709d8ae34a..e19b15461a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -27,7 +27,6 @@ import java.util.Collection;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -701,7 +700,7 @@ public class MessageStoreTest extends QpidTestCase
AMQQueue queue = null;
//Ideally we would be able to use the QueueDeclareHandler here.
- queue = getVirtualHost().createQueue(null, queueArguments);
+ queue = getVirtualHost().createQueue(queueArguments);
validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);