diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-19 21:44:19 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-19 21:44:19 +0000 |
| commit | 840b1793643b37b9fa8f8352a6851417042301ed (patch) | |
| tree | ec59ddd96d2758c5621e8b6046478dae760c6eb0 /qpid/java/broker-plugins | |
| parent | 29a70653f314b99c103c8149cacc4fed2a13898c (diff) | |
| download | qpid-python-840b1793643b37b9fa8f8352a6851417042301ed.tar.gz | |
QPID-5567 : Always Use AccessControllerContext to find the current context Subject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
18 files changed, 588 insertions, 264 deletions
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java index 6beeef2f18..75006ae697 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.security.access.plugins; import java.net.InetAddress; import java.net.InetSocketAddress; import java.io.File; +import java.net.SocketAddress; +import java.security.AccessController; +import java.util.Set; import javax.security.auth.Subject; @@ -30,8 +33,8 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang.ObjectUtils; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.security.Result; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.AccessControl; import org.apache.qpid.server.security.access.ObjectProperties; import org.apache.qpid.server.security.access.ObjectType; @@ -115,15 +118,22 @@ public class DefaultAccessControl implements AccessControl * Delegate to the {@link #authorise(Operation, ObjectType, ObjectProperties)} method, with * the operation set to ACCESS and no object properties. */ - public Result access(ObjectType objectType, Object inetSocketAddress) + public Result access(ObjectType objectType) { InetAddress addressOfClient = null; - - if(inetSocketAddress != null) + final Subject subject = Subject.getSubject(AccessController.getContext()); + if(subject != null) { - addressOfClient = ((InetSocketAddress) inetSocketAddress).getAddress(); + Set<ConnectionPrincipal> principals = subject.getPrincipals(ConnectionPrincipal.class); + if(!principals.isEmpty()) + { + SocketAddress address = principals.iterator().next().getConnection().getRemoteAddress(); + if(address instanceof InetSocketAddress) + { + addressOfClient = ((InetSocketAddress) address).getAddress(); + } + } } - return authoriseFromAddress(Operation.ACCESS, objectType, ObjectProperties.EMPTY, addressOfClient); } @@ -139,7 +149,7 @@ public class DefaultAccessControl implements AccessControl public Result authoriseFromAddress(Operation operation, ObjectType objectType, ObjectProperties properties, InetAddress addressOfClient) { - final Subject subject = SecurityManager.getThreadSubject(); + final Subject subject = Subject.getSubject(AccessController.getContext()); // Abstain if there is no subject/principal associated with this thread if (subject == null || subject.getPrincipals().size() == 0) { diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java index ea8e469edb..8ac4e0c424 100644 --- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java +++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java @@ -24,17 +24,20 @@ import static org.mockito.Mockito.*; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import javax.security.auth.Subject; import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.UnitTestMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.Result; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.ObjectProperties; import org.apache.qpid.server.security.access.ObjectType; import org.apache.qpid.server.security.access.Operation; @@ -64,7 +67,6 @@ public class DefaultAccessControlTest extends TestCase private void configureAccessControl(final RuleSet rs) throws ConfigurationException { _plugin = new DefaultAccessControl(rs); - SecurityManager.setThreadSubject(null); CurrentActor.set(new TestLogActor(messageLogger)); } @@ -86,7 +88,6 @@ public class DefaultAccessControlTest extends TestCase protected void tearDown() throws Exception { super.tearDown(); - SecurityManager.setThreadSubject(null); } /** @@ -95,8 +96,6 @@ public class DefaultAccessControlTest extends TestCase public void testNoSubjectAlwaysAbstains() throws ConfigurationException { setUpGroupAccessControl(); - SecurityManager.setThreadSubject(null); - final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); assertEquals(Result.ABSTAIN, result); } @@ -108,10 +107,16 @@ public class DefaultAccessControlTest extends TestCase public void testUsernameAllowsOperation() throws ConfigurationException { setUpGroupAccessControl(); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user1")); - - final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); - assertEquals(Result.ALLOWED, result); + Subject.doAs(TestPrincipalUtils.createTestSubject("user1"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); + assertEquals(Result.ALLOWED, result); + return null; + } + }); } /** @@ -143,14 +148,22 @@ public class DefaultAccessControlTest extends TestCase public void testCatchAllRuleDeniesUnrecognisedUsername() throws ConfigurationException { setUpGroupAccessControl(); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("unknown", "unkgroup1", "unkgroup2")); - - assertEquals("Expecting zero messages before test", 0, messageLogger.getLogMessages().size()); - final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); - assertEquals(Result.DENIED, result); + Subject.doAs(TestPrincipalUtils.createTestSubject("unknown", "unkgroup1", "unkgroup2"), + new PrivilegedAction<Object>() + { + @Override + public Object run() + { + assertEquals("Expecting zero messages before test", 0, messageLogger.getLogMessages().size()); + final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); + assertEquals(Result.DENIED, result); + + assertEquals("Expecting one message before test", 1, messageLogger.getLogMessages().size()); + assertTrue("Logged message does not contain expected string", messageLogger.messageContains(0, "ACL-1002")); + return null; + } + }); - assertEquals("Expecting one message before test", 1, messageLogger.getLogMessages().size()); - assertTrue("Logged message does not contain expected string", messageLogger.messageContains(0, "ACL-1002")); } /** @@ -163,13 +176,20 @@ public class DefaultAccessControlTest extends TestCase // grant user4 access right on any method in any component rs.grant(1, "user4", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, new ObjectProperties(ObjectProperties.STAR)); configureAccessControl(rs); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user4")); + Subject.doAs(TestPrincipalUtils.createTestSubject("user4"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + ObjectProperties actionProperties = new ObjectProperties("getName"); + actionProperties.put(ObjectProperties.Property.COMPONENT, "Test"); + + final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties); + assertEquals(Result.ALLOWED, result); + return null; + } + }); - ObjectProperties actionProperties = new ObjectProperties("getName"); - actionProperties.put(ObjectProperties.Property.COMPONENT, "Test"); - - final Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties); - assertEquals(Result.ALLOWED, result); } /** @@ -184,55 +204,91 @@ public class DefaultAccessControlTest extends TestCase ruleProperties.put(ObjectProperties.Property.COMPONENT, "Test"); rs.grant(1, "user5", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, ruleProperties); configureAccessControl(rs); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user5")); + Subject.doAs(TestPrincipalUtils.createTestSubject("user5"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + ObjectProperties actionProperties = new ObjectProperties("getName"); + actionProperties.put(ObjectProperties.Property.COMPONENT, "Test"); + Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties); + assertEquals(Result.ALLOWED, result); + + actionProperties.put(ObjectProperties.Property.COMPONENT, "Test2"); + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties); + assertEquals(Result.DEFER, result); + return null; + } + }); - ObjectProperties actionProperties = new ObjectProperties("getName"); - actionProperties.put(ObjectProperties.Property.COMPONENT, "Test"); - Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties); - assertEquals(Result.ALLOWED, result); - actionProperties.put(ObjectProperties.Property.COMPONENT, "Test2"); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, actionProperties); - assertEquals(Result.DEFER, result); } public void testAccess() throws Exception { - Subject subject = TestPrincipalUtils.createTestSubject("user1"); - SecurityManager.setThreadSubject(subject); + final Subject subject = TestPrincipalUtils.createTestSubject("user1"); + final InetAddress inetAddress = InetAddress.getLocalHost(); + final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1); + + AMQConnectionModel connectionModel = mock(AMQConnectionModel.class); + when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress); + + subject.getPrincipals().add(new ConnectionPrincipal(connectionModel)); + + Subject.doAs(subject, new PrivilegedExceptionAction<Object>() + { + @Override + public Object run() throws Exception + { + RuleSet mockRuleSet = mock(RuleSet.class); + - RuleSet mockRuleSet = mock(RuleSet.class); - InetAddress inetAddress = InetAddress.getLocalHost(); - InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1); + DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet); - DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet); + accessControl.access(ObjectType.VIRTUALHOST); - accessControl.access(ObjectType.VIRTUALHOST, inetSocketAddress); + verify(mockRuleSet).check(subject, Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY, inetAddress); + return null; + } + }); - verify(mockRuleSet).check(subject, Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY, inetAddress); } public void testAccessIsDeniedIfRuleThrowsException() throws Exception { - Subject subject = TestPrincipalUtils.createTestSubject("user1"); - SecurityManager.setThreadSubject(subject); + final Subject subject = TestPrincipalUtils.createTestSubject("user1"); + final InetAddress inetAddress = InetAddress.getLocalHost(); + final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1); - InetAddress inetAddress = InetAddress.getLocalHost(); - InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1); + AMQConnectionModel connectionModel = mock(AMQConnectionModel.class); + when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress); - RuleSet mockRuleSet = mock(RuleSet.class); - when(mockRuleSet.check( - subject, - Operation.ACCESS, - ObjectType.VIRTUALHOST, - ObjectProperties.EMPTY, - inetAddress)).thenThrow(new RuntimeException()); + subject.getPrincipals().add(new ConnectionPrincipal(connectionModel)); - DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet); - Result result = accessControl.access(ObjectType.VIRTUALHOST, inetSocketAddress); + Subject.doAs(subject, new PrivilegedExceptionAction<Object>() + { + @Override + public Object run() throws Exception + { + + + RuleSet mockRuleSet = mock(RuleSet.class); + when(mockRuleSet.check( + subject, + Operation.ACCESS, + ObjectType.VIRTUALHOST, + ObjectProperties.EMPTY, + inetAddress)).thenThrow(new RuntimeException()); + + DefaultAccessControl accessControl = new DefaultAccessControl(mockRuleSet); + Result result = accessControl.access(ObjectType.VIRTUALHOST); + + assertEquals(Result.DENIED, result); + return null; + } + }); - assertEquals(Result.DENIED, result); } @@ -248,21 +304,29 @@ public class DefaultAccessControlTest extends TestCase ruleProperties.put(ObjectProperties.Property.COMPONENT, "Test"); rs.grant(1, "user6", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, ruleProperties); configureAccessControl(rs); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user6")); - - ObjectProperties properties = new ObjectProperties("getAttribute"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); + Subject.doAs(TestPrincipalUtils.createTestSubject("user6"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + ObjectProperties properties = new ObjectProperties("getAttribute"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); + + properties.put(ObjectProperties.Property.COMPONENT, "Test2"); + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.DEFER, result); + + properties = new ObjectProperties("getAttribute2"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.DEFER, result); + + return null; + } + }); - properties.put(ObjectProperties.Property.COMPONENT, "Test2"); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.DEFER, result); - - properties = new ObjectProperties("getAttribute2"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.DEFER, result); } /** @@ -275,25 +339,33 @@ public class DefaultAccessControlTest extends TestCase // grant user8 all rights on method queryNames in all component rs.grant(1, "user8", Permission.ALLOW, Operation.ALL, ObjectType.METHOD, new ObjectProperties("queryNames")); configureAccessControl(rs); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user8")); + Subject.doAs(TestPrincipalUtils.createTestSubject("user8"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + ObjectProperties properties = new ObjectProperties(); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + properties.put(ObjectProperties.Property.NAME, "queryNames"); + + Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); - ObjectProperties properties = new ObjectProperties(); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - properties.put(ObjectProperties.Property.NAME, "queryNames"); + result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); - Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); + properties = new ObjectProperties("getAttribute"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); + assertEquals(Result.DEFER, result); - result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.DEFER, result); + return null; + } + }); - properties = new ObjectProperties("getAttribute"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); - assertEquals(Result.DEFER, result); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.DEFER, result); } /** @@ -306,24 +378,32 @@ public class DefaultAccessControlTest extends TestCase // grant user9 all rights on any method in all component rs.grant(1, "user9", Permission.ALLOW, Operation.ALL, ObjectType.METHOD, new ObjectProperties()); configureAccessControl(rs); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user9")); + Subject.doAs(TestPrincipalUtils.createTestSubject("user9"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + ObjectProperties properties = new ObjectProperties("queryNames"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); - ObjectProperties properties = new ObjectProperties("queryNames"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); + Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); - Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); + result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); - result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); + properties = new ObjectProperties("getAttribute"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); + + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); + return null; + } + }); - properties = new ObjectProperties("getAttribute"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - result = _plugin.authorise(Operation.UPDATE, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); } /** @@ -340,29 +420,43 @@ public class DefaultAccessControlTest extends TestCase rs.grant(1, "user9", Permission.ALLOW, Operation.ACCESS, ObjectType.METHOD, ruleProperties); configureAccessControl(rs); - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject("user9")); - - ObjectProperties properties = new ObjectProperties("getAttributes"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); - - properties = new ObjectProperties("getAttribute"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.ALLOWED, result); - - properties = new ObjectProperties("getAttribut"); - properties.put(ObjectProperties.Property.COMPONENT, "Test"); - result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); - assertEquals(Result.DEFER, result); + Subject.doAs(TestPrincipalUtils.createTestSubject("user9"), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + ObjectProperties properties = new ObjectProperties("getAttributes"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + Result result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); + + properties = new ObjectProperties("getAttribute"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.ALLOWED, result); + + properties = new ObjectProperties("getAttribut"); + properties.put(ObjectProperties.Property.COMPONENT, "Test"); + result = _plugin.authorise(Operation.ACCESS, ObjectType.METHOD, properties); + assertEquals(Result.DEFER, result); + return null; + } + }); } - private void authoriseAndAssertResult(Result expectedResult, String userName, String... groups) + private void authoriseAndAssertResult(final Result expectedResult, String userName, String... groups) { - SecurityManager.setThreadSubject(TestPrincipalUtils.createTestSubject(userName, groups)); - Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); - assertEquals(expectedResult, result); + Subject.doAs(TestPrincipalUtils.createTestSubject(userName, groups), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + Result result = _plugin.authorise(Operation.ACCESS, ObjectType.VIRTUALHOST, ObjectProperties.EMPTY); + assertEquals(expectedResult, result); + return null; + } + }); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 9c012eb782..024169326c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import java.net.SocketAddress; import java.security.Principal; +import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; @@ -65,7 +67,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private AtomicBoolean _logClosed = new AtomicBoolean(false); private LogActor _actor; - private Subject _authorizedSubject = null; + private final Subject _authorizedSubject = new Subject(); private Principal _authorizedPrincipal = null; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _connectionId; @@ -84,6 +86,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S public ServerConnection(final long connectionId, Broker broker) { _connectionId = connectionId; + _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger()); } @@ -249,29 +252,43 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public void received(ProtocolEvent event) + public void received(final ProtocolEvent event) { _lastIoTime.set(System.currentTimeMillis()); + Subject subject; if (event.isConnectionControl()) { CurrentActor.set(_actor); + subject = _authorizedSubject; } else { ServerSession channel = (ServerSession) getSession(event.getChannel()); LogActor channelActor = null; - if (channel != null) { + subject = channel.getAuthorizedSubject(); channelActor = channel.getLogActor(); } + else + { + subject = _authorizedSubject; + } CurrentActor.set(channelActor == null ? _actor : channelActor); } try { - super.received(event); + Subject.doAs(subject, new PrivilegedAction<Void>() + { + @Override + public Void run() + { + ServerConnection.super.received(event); + return null; + } + }); } finally { @@ -461,12 +478,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { if (authorizedSubject == null) { - _authorizedSubject = null; _authorizedPrincipal = null; } else { - _authorizedSubject = authorizedSubject; + _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); + _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 13c7e7bcd3..ce059db703 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -38,7 +38,6 @@ import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; @@ -190,7 +189,7 @@ public class ServerConnectionDelegate extends ServerDelegate } vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName); - SecurityManager.setThreadSubject(sconn.getAuthorizedSubject()); + if(vhost != null) { @@ -198,7 +197,7 @@ public class ServerConnectionDelegate extends ServerDelegate try { - vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress()); + vhost.getSecurityManager().accessVirtualhost(vhostName); } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 29f9fc549e..12c10240bb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; + +import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; @@ -100,6 +102,7 @@ public class ServerSession extends Session private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; private final UUID _id = UUID.randomUUID(); + private final Subject _subject = new Subject(); private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); @@ -147,6 +150,9 @@ public class ServerSession extends Session _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); _logSubject = new ChannelLogSubject(this); + _subject.getPrincipals().addAll(((ServerConnection) connection).getAuthorizedSubject().getPrincipals()); + _subject.getPrincipals().add(new SessionPrincipal(this)); + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() { @Override @@ -610,7 +616,7 @@ public class ServerSession extends Session public Subject getAuthorizedSubject() { - return getConnection().getAuthorizedSubject(); + return _subject; } public void addDeleteTask(Action<? super ServerSession> task) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d39ca73136..5d36aa5321 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -61,6 +60,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; @@ -89,8 +89,6 @@ public class ServerSessionDelegate extends SessionDelegate { try { - setThreadSubject(session); - if(!session.isClosing()) { Object asyncCommandMark = ((ServerSession)session).getAsyncCommandMark(); @@ -117,6 +115,10 @@ public class ServerSessionDelegate extends SessionDelegate { LOGGER.error("Exception processing command", e); exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Exception processing command: " + e); + if(e instanceof ServerScopedRuntimeException) + { + throw e; + } } } @@ -1222,7 +1224,7 @@ public class ServerSessionDelegate extends SessionDelegate arguments.put(Queue.EXCLUSIVE, exclusivityPolicy); - queue = virtualHost.createQueue((ServerSession)session, arguments); + queue = virtualHost.createQueue(arguments); } catch(QueueExistsException qe) @@ -1437,7 +1439,6 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void closed(Session session) { - setThreadSubject(session); ServerSession serverSession = (ServerSession)session; serverSession.stopSubscriptions(); @@ -1451,12 +1452,6 @@ public class ServerSessionDelegate extends SessionDelegate closed(session); } - private void setThreadSubject(Session session) - { - final ServerConnection scon = (ServerConnection) session.getConnection(); - SecurityManager.setThreadSubject(scon.getAuthorizedSubject()); - } - private static class CommandProcessedAction implements ServerTransaction.Action { private final ServerSession _serverSession; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 0146d066f1..453f3035e1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; +import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -87,6 +88,8 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; +import javax.security.auth.Subject; + public class AMQChannel<T extends AMQProtocolSession<T>> implements AMQSessionModel<AMQChannel<T>,T>, AsyncAutoCommitTransaction.FutureRecorder @@ -172,6 +175,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); + private Subject _subject; public AMQChannel(T session, int channelId, MessageStore messageStore) @@ -181,6 +185,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _channelId = channelId; _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); + _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(), + session.getAuthorizedSubject().getPublicCredentials(), + session.getAuthorizedSubject().getPrivateCredentials()); + _subject.getPrincipals().add(new SessionPrincipal(this)); _logSubject = new ChannelLogSubject(this); _actor.message(ChannelMessages.CREATE()); @@ -1253,6 +1261,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _taskList.remove(task); } + public Subject getSubject() + { + return _subject; + } + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 68f1ad7942..bd2dc9413c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.security.Principal; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -53,6 +54,7 @@ import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; @@ -135,7 +137,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>(); private ProtocolOutputConverter _protocolOutputConverter; - private Subject _authorizedSubject; + private final Subject _authorizedSubject = new Subject(); private MethodDispatcher _dispatcher; private final long _connectionID; @@ -187,6 +189,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _connectionID = connectionId; _actor = new AMQPConnectionActor(this, _broker.getRootMessageLogger()); + _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); _logSubject = new ConnectionLogSubject(this); @@ -247,62 +250,71 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void received(final ByteBuffer msg) { - final long arrivalTime = System.currentTimeMillis(); - _lastReceivedTime = arrivalTime; - _lastIoTime = arrivalTime; - _readBytes += msg.remaining(); - - _receivedLock.lock(); - try + Subject.doAs(_authorizedSubject, new PrivilegedAction<Void>() { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - for (AMQDataBlock dataBlock : dataBlocks) + @Override + public Void run() { + final long arrivalTime = System.currentTimeMillis(); + _lastReceivedTime = arrivalTime; + _lastIoTime = arrivalTime; + _readBytes += msg.remaining(); + + _receivedLock.lock(); try { - dataBlockReceived(dataBlock); - } - catch(AMQConnectionException e) - { - if(_logger.isDebugEnabled()) + final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + for (AMQDataBlock dataBlock : dataBlocks) { - _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); + try + { + dataBlockReceived(dataBlock); + } + catch(AMQConnectionException e) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e); + } + break; + } + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); + break; + } } - break; + receivedComplete(); } - catch (Exception e) + catch (ConnectionScopedRuntimeException e) { - _logger.error("Unexpected exception when processing datablock", e); + _logger.error("Unexpected exception", e); closeProtocolSession(); - break; } + catch (AMQProtocolVersionException e) + { + _logger.error("Unexpected protocol version", e); + closeProtocolSession(); + } + catch (AMQFrameDecodingException e) + { + _logger.error("Frame decoding", e); + closeProtocolSession(); + } + catch (IOException e) + { + _logger.error("I/O Exception", e); + closeProtocolSession(); + } + finally + { + _receivedLock.unlock(); + } + return null; } - receivedComplete(); - } - catch (ConnectionScopedRuntimeException e) - { - _logger.error("Unexpected exception", e); - closeProtocolSession(); - } - catch (AMQProtocolVersionException e) - { - _logger.error("Unexpected protocol version", e); - closeProtocolSession(); - } - catch (AMQFrameDecodingException e) - { - _logger.error("Frame decoding", e); - closeProtocolSession(); - } - catch (IOException e) - { - _logger.error("I/O Exception", e); - closeProtocolSession(); - } - finally - { - _receivedLock.unlock(); - } + }); + } private void receivedComplete() @@ -1126,7 +1138,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { throw new IllegalArgumentException("authorizedSubject cannot be null"); } - _authorizedSubject = authorizedSubject; + + _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); + } public Subject getAuthorizedSubject() @@ -1136,7 +1150,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public Principal getAuthorizedPrincipal() { - return _authorizedSubject == null ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_authorizedSubject); + + return _authorizedSubject.getPrincipals(AuthenticatedPrincipal.class).size() == 0 ? null : AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(_authorizedSubject); } public SocketAddress getRemoteAddress() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java index 1c992012da..582f951342 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java @@ -83,7 +83,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con // Check virtualhost access try { - virtualHost.getSecurityManager().accessVirtualhost(virtualHostName, session.getRemoteAddress()); + virtualHost.getSecurityManager().accessVirtualhost(virtualHostName); } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 26302dbd72..4a3bd1e921 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -215,7 +215,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); - final AMQQueue queue = virtualHost.createQueue(channel, attributes); + final AMQQueue queue = virtualHost.createQueue(attributes); return queue; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index c3cbb5e26f..93c7c2e778 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -32,11 +32,17 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import javax.security.auth.Subject; +import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -85,12 +91,13 @@ public class AMQStateManager implements AMQMethodListener public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { - MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher(); + final MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher(); final int channelId = evt.getChannelId(); - B body = evt.getMethod(); + final B body = evt.getMethod(); - if(channelId != 0 && _protocolSession.getChannel(channelId)== null) + final AMQChannel channel = _protocolSession.getChannel(channelId); + if(channelId != 0 && channel == null) { if(! ((body instanceof ChannelOpenBody) @@ -101,8 +108,37 @@ public class AMQStateManager implements AMQMethodListener } } + if(channel == null) + { + return body.execute(dispatcher, channelId); + } + else + { + try + { + return Subject.doAs(channel.getSubject(), new PrivilegedExceptionAction<Boolean>() + { + @Override + public Boolean run() throws AMQException + { + return body.execute(dispatcher, channelId); + } + }); + } + catch (PrivilegedActionException e) + { + if(e.getCause() instanceof AMQException) + { + throw (AMQException) e.getCause(); + } + else + { + throw new ServerScopedRuntimeException(e.getCause()); + } + } - return body.execute(dispatcher, channelId); + + } } @@ -113,7 +149,6 @@ public class AMQStateManager implements AMQMethodListener public AMQProtocolSession getProtocolSession() { - SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject()); return _protocolSession; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index cd246c081f..5e38442d7e 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -20,28 +20,38 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.net.InetAddress; +import java.net.SocketAddress; import java.security.Principal; +import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.Collection; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.security.auth.Subject; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; @@ -50,12 +60,14 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final Port _port; private final Broker _broker; + private final SubjectCreator _subjectCreator; private VirtualHost _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; private final long _connectionId; private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); private final Object _reference = new Object(); + private final Subject _subject = new Subject(); private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); @@ -91,13 +103,15 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod ConnectionEndpoint conn, long connectionId, Port port, - Transport transport) + Transport transport, final SubjectCreator subjectCreator) { _broker = broker; _port = port; _transport = transport; _conn = conn; _connectionId = connectionId; + _subject.getPrincipals().add(new ConnectionPrincipal(this)); + _subjectCreator = subjectCreator; //_vhost.getConnectionRegistry().registerConnection(this); } @@ -109,9 +123,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod public void remoteSessionCreation(SessionEndpoint endpoint) { - Session_1_0 session = new Session_1_0(_vhost, this, endpoint); + final Session_1_0 session = new Session_1_0(_vhost, this, endpoint); _sessions.add(session); - endpoint.setSessionEventListener(session); + endpoint.setSessionEventListener(new SessionEventListener() + { + @Override + public void remoteLinkCreation(final LinkEndpoint endpoint) + { + Subject.doAs(session.getSubject(),new PrivilegedAction<Object>() + { + @Override + public Object run() + { + session.remoteLinkCreation(endpoint); + return null; + } + }); + } + + @Override + public void remoteEnd(final End end) + { + Subject.doAs(session.getSubject(),new PrivilegedAction<Object>() + { + @Override + public Object run() + { + session.remoteEnd(end); + return null; + } + }); + } + }); } void sessionEnded(Session_1_0 session) @@ -209,6 +252,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return String.valueOf(_conn.getRemoteAddress()); } + public SocketAddress getRemoteAddress() + { + return _conn.getRemoteAddress(); + } + @Override public String getClientId() { @@ -235,7 +283,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod public Principal getAuthorizedPrincipal() { - return _conn.getUser(); + Set<AuthenticatedPrincipal> authPrincipals = _subject.getPrincipals(AuthenticatedPrincipal.class); + return authPrincipals.isEmpty() ? null : authPrincipals.iterator().next(); } @Override @@ -365,7 +414,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod err.setDescription("Unknown hostname " + _conn.getLocalHostname()); _conn.close(err); } + Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser()); + _subject.getPrincipals().addAll(authSubject.getPrincipals()); + _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials()); + _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials()); } } + + Subject getSubject() + { + return _subject; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index aa52c6191a..468ba2758e 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -25,10 +25,12 @@ import java.io.PrintWriter; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.security.Principal; +import java.security.PrivilegedAction; import java.util.LinkedHashMap; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.security.auth.Subject; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.qpid.amqp_1_0.codec.FrameWriter; @@ -194,7 +196,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); _endpoint.setSaslFrameOutput(this); @@ -256,7 +258,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private final Logger RAW_LOGGER = Logger.getLogger("RAW"); - public synchronized void received(ByteBuffer msg) + public synchronized void received(final ByteBuffer msg) { try { @@ -357,8 +359,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut case FRAME: if (msg.hasRemaining()) { - _frameHandler = _frameHandler.parse(msg); - _connection.frameReceived(); + Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + _frameHandler = _frameHandler.parse(msg); + _connection.frameReceived(); + return null; + } + }); + } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index d0e77b4878..50c0693f31 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -216,7 +216,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); - queue = _vhost.createQueue(getSession(), attributes); + queue = _vhost.createQueue(attributes); } else { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index b96738e0f6..dcf9e5d7a1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -21,12 +21,15 @@ package org.apache.qpid.server.protocol.v1_0; import java.security.AccessControlException; +import java.security.PrivilegedAction; import java.text.MessageFormat; import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; @@ -39,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.model.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; @@ -55,6 +59,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; +import javax.security.auth.Subject; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -78,6 +83,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); private AtomicBoolean _closed = new AtomicBoolean(); + private final Subject _subject = new Subject(); + public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -86,13 +93,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; - + _subject.getPrincipals().addAll(connection.getSubject().getPrincipals()); + _subject.getPrincipals().add(new SessionPrincipal(this)); } public void remoteLinkCreation(final LinkEndpoint endpoint) { - Destination destination; Link_1_0 link = null; Error error = null; @@ -105,7 +112,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio if(endpoint.getRole() == Role.SENDER) { - SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); + final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); if(previousLink == null) { @@ -161,7 +168,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio _vhost, (SendingDestination) destination ); - sendingLinkEndpoint.setLinkEventListener(sendingLink); + + sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); + link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) { @@ -194,7 +203,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio endpoint.setSource(oldSource); SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); - sendingLinkEndpoint.setLinkEventListener(previousLink); + sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(previousLink)); link = previousLink; endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); } @@ -237,7 +246,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final TxnCoordinatorLink_1_0 coordinatorLink = new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); - receivingLinkEndpoint.setLinkEventListener(coordinatorLink); + receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink)); link = coordinatorLink; @@ -296,7 +305,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, (ReceivingDestination) destination); - receivingLinkEndpoint.setLinkEventListener(receivingLink); + + receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink)); + link = receivingLink; if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) { @@ -377,7 +388,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio // TODO convert AMQP 1-0 node properties to queue attributes - final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); + final AMQQueue tempQueue = queue = _vhost.createQueue(attributes ); } catch (AccessControlException e) { @@ -640,4 +651,86 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio { _taskList.remove(task); } + + public Subject getSubject() + { + return _subject; + } + + + private class SubjectSpecificReceivingLinkListener implements ReceivingLinkListener + { + private final ReceivingLinkListener _linkListener; + + public SubjectSpecificReceivingLinkListener(final ReceivingLinkListener linkListener) + { + _linkListener = linkListener; + } + + @Override + public void messageTransfer(final Transfer xfr) + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _linkListener.messageTransfer(xfr); + return null; + } + }); + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _linkListener.remoteDetached(endpoint, detach); + return null; + } + }); + } + } + + private class SubjectSpecificSendingLinkListener implements SendingLinkListener + { + private final SendingLink_1_0 _previousLink; + + public SubjectSpecificSendingLinkListener(final SendingLink_1_0 previousLink) + { + _previousLink = previousLink; + } + + @Override + public void flowStateChanged() + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _previousLink.flowStateChanged(); + return null; + } + }); + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _previousLink.remoteDetached(endpoint, detach); + return null; + } + }); + } + } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java index 674ff71232..0efb76a241 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.AccessControlException; import java.security.Principal; +import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.security.cert.X509Certificate; @@ -119,37 +120,22 @@ public class HttpManagementUtil public static void assertManagementAccess(final SecurityManager securityManager, Subject subject, LogActor actor) { // TODO: We should eliminate SecurityManager.setThreadSubject in favour of Subject.doAs - SecurityManager.setThreadSubject(subject); // Required for accessManagement check CurrentActor.set(actor); try { - try + Subject.doAs(subject, new PrivilegedAction<Void>() { - Subject.doAs(subject, new PrivilegedExceptionAction<Void>() + @Override + public Void run() { - @Override - public Void run() - { - securityManager.accessManagement(); - return null; - } - }); - } - catch (PrivilegedActionException e) - { - throw new ServerScopedRuntimeException("Unable to perform access check", e); - } + securityManager.accessManagement(); + return null; + } + }); } finally { - try - { - CurrentActor.remove(); - } - finally - { - SecurityManager.setThreadSubject(null); - } + CurrentActor.remove(); } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java index 0381b711bc..1a13733ff5 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java @@ -198,43 +198,36 @@ public abstract class AbstractServlet extends HttpServlet return; } - SecurityManager.setThreadSubject(subject); + HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker); + CurrentActor.set(logActor); try { - HttpManagementActor logActor = HttpManagementUtil.getOrCreateAndCacheLogActor(request, _broker); - CurrentActor.set(logActor); - try - { - Subject.doAs(subject, privilegedExceptionAction); - } - catch(RuntimeException e) - { - LOGGER.error("Unable to perform action", e); - throw e; - } - catch (PrivilegedActionException e) + Subject.doAs(subject, privilegedExceptionAction); + } + catch(RuntimeException e) + { + LOGGER.error("Unable to perform action", e); + throw e; + } + catch (PrivilegedActionException e) + { + LOGGER.error("Unable to perform action", e); + Throwable cause = e.getCause(); + if(cause instanceof RuntimeException) { - LOGGER.error("Unable to perform action", e); - Throwable cause = e.getCause(); - if(cause instanceof RuntimeException) - { - throw (RuntimeException)cause; - } - if(cause instanceof Error) - { - throw (Error)cause; - } - throw new ConnectionScopedRuntimeException(e.getCause()); + throw (RuntimeException)cause; } - finally + if(cause instanceof Error) { - CurrentActor.remove(); + throw (Error)cause; } + throw new ConnectionScopedRuntimeException(e.getCause()); } finally { - SecurityManager.setThreadSubject(null); + CurrentActor.remove(); } + } protected Subject getAuthorisedSubject(HttpServletRequest request) diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java index e9716ab775..e78e59b7fc 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/MBeanInvocationHandlerImpl.java @@ -162,7 +162,6 @@ public class MBeanInvocationHandlerImpl implements InvocationHandler } // Save the subject - SecurityManager.setThreadSubject(subject); CurrentActor.set(_logActor); try { |
