summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-07 12:36:36 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-07 12:36:36 +0000
commit793a6b31f7ad6b27f0683eef52ae92e05d7f8fee (patch)
tree0bbd157b996c54e27bfbaadf6de6f0153909d630 /java/broker
parentc346e8d12bd8a287695f9a427e22e9c837b866cb (diff)
downloadqpid-python-793a6b31f7ad6b27f0683eef52ae92e05d7f8fee.tar.gz
QPID-160 Addition of JMSXUserID to all messages through the java broker.
As this will cause the headers to be re-encoded it can be disabled in the config.xml. Default is enabled as the sample config.xml should have all features enabled so that testing can observe the interactions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@592729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/etc/config.xml7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java199
2 files changed, 116 insertions, 90 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index b5b81bbeb0..2257a612b3 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -8,9 +8,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- -
+ -
- http://www.apache.org/licenses/LICENSE-2.0
- -
+ -
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,6 +50,7 @@
<enableDirectBuffers>false</enableDirectBuffers>
<framesize>65535</framesize>
<compressBufferOnQueue>false</compressBufferOnQueue>
+ <enableJMSXUserID>true</enableJMSXUserID>
</advanced>
<security>
@@ -84,7 +85,7 @@
<jmx>
<access>${conf}/jmxremote.access</access>
<principal-database>passwordfile</principal-database>
- </jmx>
+ </jmx>
</security>
<virtualhosts>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index d3b459c48a..17300d6b50 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -43,6 +45,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.configuration.Configurator;
import java.util.Collection;
import java.util.HashMap;
@@ -118,9 +121,17 @@ public class AMQChannel
private final AMQProtocolSession _session;
private boolean _closing;
+ @Configured(path = "advanced.enableJMSXUserID",
+ defaultValue = "true")
+ public boolean ENABLE_JMSXUserID;
+
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws AMQException
{
+ //Set values from configuration
+ Configurator.configure(this);
+
_session = session;
_channelId = channelId;
_storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
@@ -199,7 +210,7 @@ public class AMQChannel
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
- throws AMQException
+ throws AMQException
{
if (_currentMessage == null)
{
@@ -212,6 +223,16 @@ public class AMQChannel
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
+ if (ENABLE_JMSXUserID)
+ {
+ //Set JMSXUserID
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
+ //fixme: fudge for QPID-677
+ properties.getHeaders().keySet();
+
+ properties.setUserId(protocolSession.getAuthorizedID().getName());
+ }
+
_currentMessage.setContentHeaderBody(contentHeaderBody);
_currentMessage.setExpiration();
@@ -245,8 +266,8 @@ public class AMQChannel
// returns true iff the message was delivered (i.e. if all data was
// received
if (_currentMessage.addContentBodyFrame(_storeContext,
- protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
- contentBody)))
+ protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+ contentBody)))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
@@ -303,7 +324,7 @@ public class AMQChannel
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -327,18 +348,19 @@ public class AMQChannel
{
_log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
+ {
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(message);
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(message);
- return true;
- }
+ return true;
+ }
- public void visitComplete()
- { }
- });
+ public void visitComplete()
+ {
+ }
+ });
}
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
@@ -418,7 +440,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
- + ") with a queue(" + queue + ") for " + consumerTag);
+ + ") with a queue(" + queue + ") for " + consumerTag);
}
}
}
@@ -464,7 +486,7 @@ public class AMQChannel
// if (_nonTransactedContext == null)
{
_nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -527,7 +549,7 @@ public class AMQChannel
// if (_nonTransactedContext == null)
{
_nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -547,7 +569,7 @@ public class AMQChannel
else
{
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
- + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+ + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
// _log.error("Requested requeue of message:" + deliveryTag +
// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
//
@@ -558,25 +580,26 @@ public class AMQChannel
else
{
_log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
- + _unacknowledgedMessageMap.size());
+ + _unacknowledgedMessageMap.size());
if (_log.isDebugEnabled())
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- int count = 0;
+ {
+ int count = 0;
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(
(count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
- return false; // Continue
- }
+ return false; // Continue
+ }
- public void visitComplete()
- { }
- });
+ public void visitComplete()
+ {
+ }
+ });
}
}
@@ -603,53 +626,54 @@ public class AMQChannel
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if (consumerTag != null)
{
- AMQShortString consumerTag = message.consumerTag;
- AMQMessage msg = message.message;
- msg.setRedelivered(true);
- if (consumerTag != null)
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
{
- // Consumer exists
- if (_consumerTag2QueueMap.containsKey(consumerTag))
- {
- msgToResend.add(message);
- }
- else // consumer has gone
- {
- msgToRequeue.add(message);
- }
+ msgToResend.add(message);
}
- else
+ else // consumer has gone
+ {
+ msgToRequeue.add(message);
+ }
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null)
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (message.queue != null)
+ if (requeue)
{
- if (requeue)
- {
- msgToRequeue.add(message);
- }
- else
- {
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
+ msgToRequeue.add(message);
}
else
{
- _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
}
-
- // false means continue processing
- return false;
+ else
+ {
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
+ }
}
- public void visitComplete()
- { }
- });
+ // false means continue processing
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
// Process Messages to Resend
if (_log.isDebugEnabled())
@@ -704,7 +728,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug("Subscription(" + System.identityHashCode(sub)
- + ") closed during resend so requeuing message");
+ + ") closed during resend so requeuing message");
}
// move this message to requeue
msgToRequeue.add(message);
@@ -714,7 +738,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
- + System.identityHashCode(sub));
+ + System.identityHashCode(sub));
}
sub.addToResendQueue(msg);
@@ -728,7 +752,7 @@ public class AMQChannel
if (_log.isInfoEnabled())
{
_log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
- + ")to prevent loss");
+ + ")to prevent loss");
}
// move this message to requeue
msgToRequeue.add(message);
@@ -752,7 +776,7 @@ public class AMQChannel
if (_nonTransactedContext == null)
{
_nonTransactedContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
@@ -786,29 +810,30 @@ public class AMQChannel
public void queueDeleted(final AMQQueue queue) throws AMQException
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ if (message.queue == queue)
{
- if (message.queue == queue)
+ try
{
- try
- {
- message.discard(_storeContext);
- message.queue = null;
- }
- catch (AMQException e)
- {
- _log.error(
+ message.discard(_storeContext);
+ message.queue = null;
+ }
+ catch (AMQException e)
+ {
+ _log.error(
"Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
- }
}
-
- return false;
}
- public void visitComplete()
- { }
- });
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -856,8 +881,8 @@ public class AMQChannel
boolean suspend;
suspend =
- ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
- || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
+ ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
+ || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
setSuspended(suspend);
}
@@ -942,7 +967,7 @@ public class AMQChannel
{
AMQMessage message = bouncedMessage.getAMQMessage();
session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ new AMQShortString(bouncedMessage.getMessage()));
message.decrementReference(_storeContext);
}
@@ -959,7 +984,7 @@ public class AMQChannel
else
{
boolean willSuspend =
- ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
+ ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();