diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-19 21:44:19 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-19 21:44:19 +0000 |
| commit | 840b1793643b37b9fa8f8352a6851417042301ed (patch) | |
| tree | ec59ddd96d2758c5621e8b6046478dae760c6eb0 /qpid/java/broker-plugins/amqp-1-0-protocol | |
| parent | 29a70653f314b99c103c8149cacc4fed2a13898c (diff) | |
| download | qpid-python-840b1793643b37b9fa8f8352a6851417042301ed.tar.gz | |
QPID-5567 : Always Use AccessControllerContext to find the current context Subject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol')
4 files changed, 179 insertions, 17 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index cd246c081f..5e38442d7e 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -20,28 +20,38 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.net.InetAddress; +import java.net.SocketAddress; import java.security.Principal; +import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.Collection; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.security.auth.Subject; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; @@ -50,12 +60,14 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final Port _port; private final Broker _broker; + private final SubjectCreator _subjectCreator; private VirtualHost _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; private final long _connectionId; private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); private final Object _reference = new Object(); + private final Subject _subject = new Subject(); private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); @@ -91,13 +103,15 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod ConnectionEndpoint conn, long connectionId, Port port, - Transport transport) + Transport transport, final SubjectCreator subjectCreator) { _broker = broker; _port = port; _transport = transport; _conn = conn; _connectionId = connectionId; + _subject.getPrincipals().add(new ConnectionPrincipal(this)); + _subjectCreator = subjectCreator; //_vhost.getConnectionRegistry().registerConnection(this); } @@ -109,9 +123,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod public void remoteSessionCreation(SessionEndpoint endpoint) { - Session_1_0 session = new Session_1_0(_vhost, this, endpoint); + final Session_1_0 session = new Session_1_0(_vhost, this, endpoint); _sessions.add(session); - endpoint.setSessionEventListener(session); + endpoint.setSessionEventListener(new SessionEventListener() + { + @Override + public void remoteLinkCreation(final LinkEndpoint endpoint) + { + Subject.doAs(session.getSubject(),new PrivilegedAction<Object>() + { + @Override + public Object run() + { + session.remoteLinkCreation(endpoint); + return null; + } + }); + } + + @Override + public void remoteEnd(final End end) + { + Subject.doAs(session.getSubject(),new PrivilegedAction<Object>() + { + @Override + public Object run() + { + session.remoteEnd(end); + return null; + } + }); + } + }); } void sessionEnded(Session_1_0 session) @@ -209,6 +252,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return String.valueOf(_conn.getRemoteAddress()); } + public SocketAddress getRemoteAddress() + { + return _conn.getRemoteAddress(); + } + @Override public String getClientId() { @@ -235,7 +283,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod public Principal getAuthorizedPrincipal() { - return _conn.getUser(); + Set<AuthenticatedPrincipal> authPrincipals = _subject.getPrincipals(AuthenticatedPrincipal.class); + return authPrincipals.isEmpty() ? null : authPrincipals.iterator().next(); } @Override @@ -365,7 +414,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod err.setDescription("Unknown hostname " + _conn.getLocalHostname()); _conn.close(err); } + Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser()); + _subject.getPrincipals().addAll(authSubject.getPrincipals()); + _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials()); + _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials()); } } + + Subject getSubject() + { + return _subject; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index aa52c6191a..468ba2758e 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -25,10 +25,12 @@ import java.io.PrintWriter; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.security.Principal; +import java.security.PrivilegedAction; import java.util.LinkedHashMap; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.security.auth.Subject; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.qpid.amqp_1_0.codec.FrameWriter; @@ -194,7 +196,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); _endpoint.setSaslFrameOutput(this); @@ -256,7 +258,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private final Logger RAW_LOGGER = Logger.getLogger("RAW"); - public synchronized void received(ByteBuffer msg) + public synchronized void received(final ByteBuffer msg) { try { @@ -357,8 +359,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut case FRAME: if (msg.hasRemaining()) { - _frameHandler = _frameHandler.parse(msg); - _connection.frameReceived(); + Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>() + { + @Override + public Void run() + { + _frameHandler = _frameHandler.parse(msg); + _connection.frameReceived(); + return null; + } + }); + } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index d0e77b4878..50c0693f31 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -216,7 +216,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); - queue = _vhost.createQueue(getSession(), attributes); + queue = _vhost.createQueue(attributes); } else { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index b96738e0f6..dcf9e5d7a1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -21,12 +21,15 @@ package org.apache.qpid.server.protocol.v1_0; import java.security.AccessControlException; +import java.security.PrivilegedAction; import java.text.MessageFormat; import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; @@ -39,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.model.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; @@ -55,6 +59,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; +import javax.security.auth.Subject; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -78,6 +83,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); private AtomicBoolean _closed = new AtomicBoolean(); + private final Subject _subject = new Subject(); + public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -86,13 +93,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; - + _subject.getPrincipals().addAll(connection.getSubject().getPrincipals()); + _subject.getPrincipals().add(new SessionPrincipal(this)); } public void remoteLinkCreation(final LinkEndpoint endpoint) { - Destination destination; Link_1_0 link = null; Error error = null; @@ -105,7 +112,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio if(endpoint.getRole() == Role.SENDER) { - SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); + final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); if(previousLink == null) { @@ -161,7 +168,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio _vhost, (SendingDestination) destination ); - sendingLinkEndpoint.setLinkEventListener(sendingLink); + + sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); + link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) { @@ -194,7 +203,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio endpoint.setSource(oldSource); SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint; previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint)); - sendingLinkEndpoint.setLinkEventListener(previousLink); + sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(previousLink)); link = previousLink; endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap()); } @@ -237,7 +246,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final TxnCoordinatorLink_1_0 coordinatorLink = new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions); - receivingLinkEndpoint.setLinkEventListener(coordinatorLink); + receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink)); link = coordinatorLink; @@ -296,7 +305,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint; final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost, (ReceivingDestination) destination); - receivingLinkEndpoint.setLinkEventListener(receivingLink); + + receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink)); + link = receivingLink; if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) { @@ -377,7 +388,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio // TODO convert AMQP 1-0 node properties to queue attributes - final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); + final AMQQueue tempQueue = queue = _vhost.createQueue(attributes ); } catch (AccessControlException e) { @@ -640,4 +651,86 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio { _taskList.remove(task); } + + public Subject getSubject() + { + return _subject; + } + + + private class SubjectSpecificReceivingLinkListener implements ReceivingLinkListener + { + private final ReceivingLinkListener _linkListener; + + public SubjectSpecificReceivingLinkListener(final ReceivingLinkListener linkListener) + { + _linkListener = linkListener; + } + + @Override + public void messageTransfer(final Transfer xfr) + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _linkListener.messageTransfer(xfr); + return null; + } + }); + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _linkListener.remoteDetached(endpoint, detach); + return null; + } + }); + } + } + + private class SubjectSpecificSendingLinkListener implements SendingLinkListener + { + private final SendingLink_1_0 _previousLink; + + public SubjectSpecificSendingLinkListener(final SendingLink_1_0 previousLink) + { + _previousLink = previousLink; + } + + @Override + public void flowStateChanged() + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _previousLink.flowStateChanged(); + return null; + } + }); + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + Subject.doAs(_subject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + _previousLink.remoteDetached(endpoint, detach); + return null; + } + }); + } + } } |
