summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-19 21:44:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-19 21:44:19 +0000
commit840b1793643b37b9fa8f8352a6851417042301ed (patch)
treeec59ddd96d2758c5621e8b6046478dae760c6eb0 /qpid/java/broker-plugins/amqp-1-0-protocol
parent29a70653f314b99c103c8149cacc4fed2a13898c (diff)
downloadqpid-python-840b1793643b37b9fa8f8352a6851417042301ed.tar.gz
QPID-5567 : Always Use AccessControllerContext to find the current context Subject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java66
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java19
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java109
4 files changed, 179 insertions, 17 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index cd246c081f..5e38442d7e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,28 +20,38 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.net.InetAddress;
+import java.net.SocketAddress;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.Collection;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.security.auth.Subject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
@@ -50,12 +60,14 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private final Port _port;
private final Broker _broker;
+ private final SubjectCreator _subjectCreator;
private VirtualHost _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
private final long _connectionId;
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
+ private final Subject _subject = new Subject();
private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
@@ -91,13 +103,15 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
ConnectionEndpoint conn,
long connectionId,
Port port,
- Transport transport)
+ Transport transport, final SubjectCreator subjectCreator)
{
_broker = broker;
_port = port;
_transport = transport;
_conn = conn;
_connectionId = connectionId;
+ _subject.getPrincipals().add(new ConnectionPrincipal(this));
+ _subjectCreator = subjectCreator;
//_vhost.getConnectionRegistry().registerConnection(this);
}
@@ -109,9 +123,38 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
+ final Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
_sessions.add(session);
- endpoint.setSessionEventListener(session);
+ endpoint.setSessionEventListener(new SessionEventListener()
+ {
+ @Override
+ public void remoteLinkCreation(final LinkEndpoint endpoint)
+ {
+ Subject.doAs(session.getSubject(),new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteLinkCreation(endpoint);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteEnd(final End end)
+ {
+ Subject.doAs(session.getSubject(),new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteEnd(end);
+ return null;
+ }
+ });
+ }
+ });
}
void sessionEnded(Session_1_0 session)
@@ -209,6 +252,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
return String.valueOf(_conn.getRemoteAddress());
}
+ public SocketAddress getRemoteAddress()
+ {
+ return _conn.getRemoteAddress();
+ }
+
@Override
public String getClientId()
{
@@ -235,7 +283,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
public Principal getAuthorizedPrincipal()
{
- return _conn.getUser();
+ Set<AuthenticatedPrincipal> authPrincipals = _subject.getPrincipals(AuthenticatedPrincipal.class);
+ return authPrincipals.isEmpty() ? null : authPrincipals.iterator().next();
}
@Override
@@ -365,7 +414,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
err.setDescription("Unknown hostname " + _conn.getLocalHostname());
_conn.close(err);
}
+ Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser());
+ _subject.getPrincipals().addAll(authSubject.getPrincipals());
+ _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
+ _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
}
}
+
+ Subject getSubject()
+ {
+ return _subject;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index aa52c6191a..468ba2758e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -25,10 +25,12 @@ import java.io.PrintWriter;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.Principal;
+import java.security.PrivilegedAction;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
@@ -194,7 +196,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_endpoint.setProperties(serverProperties);
_endpoint.setRemoteAddress(getRemoteAddress());
- _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport);
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
_endpoint.setConnectionEventListener(_connection);
_endpoint.setFrameOutputHandler(this);
_endpoint.setSaslFrameOutput(this);
@@ -256,7 +258,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private final Logger RAW_LOGGER = Logger.getLogger("RAW");
- public synchronized void received(ByteBuffer msg)
+ public synchronized void received(final ByteBuffer msg)
{
try
{
@@ -357,8 +359,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
case FRAME:
if (msg.hasRemaining())
{
- _frameHandler = _frameHandler.parse(msg);
- _connection.frameReceived();
+ Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ _frameHandler = _frameHandler.parse(msg);
+ _connection.frameReceived();
+ return null;
+ }
+ });
+
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index d0e77b4878..50c0693f31 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -216,7 +216,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK);
- queue = _vhost.createQueue(getSession(), attributes);
+ queue = _vhost.createQueue(attributes);
}
else
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index b96738e0f6..dcf9e5d7a1 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -21,12 +21,15 @@
package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
+import java.security.PrivilegedAction;
import java.text.MessageFormat;
import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.*;
@@ -39,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.model.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
+import javax.security.auth.Subject;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,6 +83,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private final Connection_1_0 _connection;
private UUID _id = UUID.randomUUID();
private AtomicBoolean _closed = new AtomicBoolean();
+ private final Subject _subject = new Subject();
+
public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -86,13 +93,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
_endpoint = endpoint;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
-
+ _subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
+ _subject.getPrincipals().add(new SessionPrincipal(this));
}
public void remoteLinkCreation(final LinkEndpoint endpoint)
{
-
Destination destination;
Link_1_0 link = null;
Error error = null;
@@ -105,7 +112,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
if(endpoint.getRole() == Role.SENDER)
{
- SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
+ final SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName());
if(previousLink == null)
{
@@ -161,7 +168,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
_vhost,
(SendingDestination) destination
);
- sendingLinkEndpoint.setLinkEventListener(sendingLink);
+
+ sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
+
link = sendingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
{
@@ -194,7 +203,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
endpoint.setSource(oldSource);
SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
- sendingLinkEndpoint.setLinkEventListener(previousLink);
+ sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(previousLink));
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
@@ -237,7 +246,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorLink_1_0 coordinatorLink =
new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
- receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
+ receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink));
link = coordinatorLink;
@@ -296,7 +305,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
(ReceivingDestination) destination);
- receivingLinkEndpoint.setLinkEventListener(receivingLink);
+
+ receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
+
link = receivingLink;
if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
{
@@ -377,7 +388,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
// TODO convert AMQP 1-0 node properties to queue attributes
- final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes );
+ final AMQQueue tempQueue = queue = _vhost.createQueue(attributes );
}
catch (AccessControlException e)
{
@@ -640,4 +651,86 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
{
_taskList.remove(task);
}
+
+ public Subject getSubject()
+ {
+ return _subject;
+ }
+
+
+ private class SubjectSpecificReceivingLinkListener implements ReceivingLinkListener
+ {
+ private final ReceivingLinkListener _linkListener;
+
+ public SubjectSpecificReceivingLinkListener(final ReceivingLinkListener linkListener)
+ {
+ _linkListener = linkListener;
+ }
+
+ @Override
+ public void messageTransfer(final Transfer xfr)
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _linkListener.messageTransfer(xfr);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _linkListener.remoteDetached(endpoint, detach);
+ return null;
+ }
+ });
+ }
+ }
+
+ private class SubjectSpecificSendingLinkListener implements SendingLinkListener
+ {
+ private final SendingLink_1_0 _previousLink;
+
+ public SubjectSpecificSendingLinkListener(final SendingLink_1_0 previousLink)
+ {
+ _previousLink = previousLink;
+ }
+
+ @Override
+ public void flowStateChanged()
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _previousLink.flowStateChanged();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ Subject.doAs(_subject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _previousLink.remoteDetached(endpoint, detach);
+ return null;
+ }
+ });
+ }
+ }
}