summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/etc/config.xml3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java45
4 files changed, 68 insertions, 19 deletions
diff --git a/qpid/java/broker/etc/config.xml b/qpid/java/broker/etc/config.xml
index 80ee039ee5..a36fe57c06 100644
--- a/qpid/java/broker/etc/config.xml
+++ b/qpid/java/broker/etc/config.xml
@@ -74,6 +74,9 @@
<access>
<class>org.apache.qpid.server.security.access.plugins.AllowAll</class>
</access>
+
+ <msg-auth>false</msg-auth>
+
<jmx>
<access>${conf}/jmxremote.access</access>
<principal-database>passwordfile</principal-database>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 847c8b8459..3cd343e1b2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.UnauthorizedAccessException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -115,12 +116,7 @@ public class AMQChannel
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
-
- @Configured(path = "advanced.enableJMSXUserID",
- defaultValue = "false")
- public boolean ENABLE_JMSXUserID;
-
+ private boolean _closing;
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -180,16 +176,6 @@ public class AMQChannel
_log.debug("Content header received on channel " + _channelId);
}
- if (ENABLE_JMSXUserID)
- {
- //Set JMSXUserID
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
- //fixme: fudge for QPID-677
- properties.getHeaders().keySet();
-
- properties.setUserId(_session.getAuthorizedID().getName());
- }
-
_currentMessage.setContentHeaderBody(contentHeaderBody);
_currentMessage.setExpiration();
@@ -217,6 +203,10 @@ public class AMQChannel
{
_returnMessages.add(e);
}
+ catch(UnauthorizedAccessException ex)
+ {
+ _returnMessages.add(ex);
+ }
finally
{
// callback to allow the context to do any post message processing
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 6287172ce8..fc96aa901a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -52,6 +52,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
private final Long _messageId;
private final TransactionalContext _txnContext;
+ private static final boolean MSG_AUTH =
+ ApplicationRegistry.getInstance().getConfiguration().getBoolean("security.msg-auth", false);
/**
@@ -69,7 +71,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private AMQProtocolSession _publisher;
private MessageStore _messageStore;
private long _expiration;
-
+
private Exchange _exchange;
@@ -164,12 +166,21 @@ public class IncomingMessage implements Filterable<RuntimeException>
_messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
_messagePublishInfo, getContentHeaderBody());
-
+
+
message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
message.setExpiration(_expiration);
message.setClientIdentifier(_publisher.getSessionIdentifier());
+ AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
+ ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
+
+ if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
+ {
+ throw new UnauthorizedAccessException("Acccess Refused",message);
+ }
+
if ((_destinationQueues == null) || _destinationQueues.size() == 0)
{
@@ -274,7 +285,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 2;
}
-
+
public boolean isRedelivered()
{
return false;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java
new file mode 100644
index 0000000000..295cb266b9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+/**
+ * UnauthorizedAccessException is a {@link RequiredDeliveryException} that represents the failure case where a message
+ * is published with a user id different from the one used when creating the connection .
+ * The AMQP status code, 403, is always used to report this condition.
+ *
+ */
+
+public class UnauthorizedAccessException extends RequiredDeliveryException
+{
+ public UnauthorizedAccessException(String msg, AMQMessage amqMessage)
+ {
+ super(msg, amqMessage);
+ }
+
+ public AMQConstant getReplyCode()
+ {
+ return AMQConstant.ACCESS_REFUSED;
+ }
+}