diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-11-07 12:36:36 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-11-07 12:36:36 +0000 |
| commit | 793a6b31f7ad6b27f0683eef52ae92e05d7f8fee (patch) | |
| tree | 0bbd157b996c54e27bfbaadf6de6f0153909d630 /java/broker | |
| parent | c346e8d12bd8a287695f9a427e22e9c837b866cb (diff) | |
| download | qpid-python-793a6b31f7ad6b27f0683eef52ae92e05d7f8fee.tar.gz | |
QPID-160 Addition of JMSXUserID to all messages through the java broker.
As this will cause the headers to be re-encoded it can be disabled in the config.xml. Default is enabled as the sample config.xml should have all features enabled so that testing can observe the interactions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@592729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/etc/config.xml | 7 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 199 |
2 files changed, 116 insertions, 90 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index b5b81bbeb0..2257a612b3 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -8,9 +8,9 @@ - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - + - - http://www.apache.org/licenses/LICENSE-2.0 - - + - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,6 +50,7 @@ <enableDirectBuffers>false</enableDirectBuffers> <framesize>65535</framesize> <compressBufferOnQueue>false</compressBufferOnQueue> + <enableJMSXUserID>true</enableJMSXUserID> </advanced> <security> @@ -84,7 +85,7 @@ <jmx> <access>${conf}/jmxremote.access</access> <principal-database>passwordfile</principal-database> - </jmx> + </jmx> </security> <virtualhosts> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index d3b459c48a..17300d6b50 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,7 +23,9 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; @@ -43,6 +45,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.configuration.Configurator; import java.util.Collection; import java.util.HashMap; @@ -118,9 +121,17 @@ public class AMQChannel private final AMQProtocolSession _session; private boolean _closing; + @Configured(path = "advanced.enableJMSXUserID", + defaultValue = "true") + public boolean ENABLE_JMSXUserID; + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { + //Set values from configuration + Configurator.configure(this); + _session = session; _channelId = channelId; _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); @@ -199,7 +210,7 @@ public class AMQChannel } public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) - throws AMQException + throws AMQException { if (_currentMessage == null) { @@ -212,6 +223,16 @@ public class AMQChannel _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } + if (ENABLE_JMSXUserID) + { + //Set JMSXUserID + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties; + //fixme: fudge for QPID-677 + properties.getHeaders().keySet(); + + properties.setUserId(protocolSession.getAuthorizedID().getName()); + } + _currentMessage.setContentHeaderBody(contentHeaderBody); _currentMessage.setExpiration(); @@ -245,8 +266,8 @@ public class AMQChannel // returns true iff the message was delivered (i.e. if all data was // received if (_currentMessage.addContentBodyFrame(_storeContext, - protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk( - contentBody))) + protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk( + contentBody))) { // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case @@ -303,7 +324,7 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -327,18 +348,19 @@ public class AMQChannel { _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { + { - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug(message); + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug(message); - return true; - } + return true; + } - public void visitComplete() - { } - }); + public void visitComplete() + { + } + }); } AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); @@ -418,7 +440,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag - + ") with a queue(" + queue + ") for " + consumerTag); + + ") with a queue(" + queue + ") for " + consumerTag); } } } @@ -464,7 +486,7 @@ public class AMQChannel // if (_nonTransactedContext == null) { _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -527,7 +549,7 @@ public class AMQChannel // if (_nonTransactedContext == null) { _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -547,7 +569,7 @@ public class AMQChannel else { _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() - + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); // _log.error("Requested requeue of message:" + deliveryTag + // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); // @@ -558,25 +580,26 @@ public class AMQChannel else { _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." - + _unacknowledgedMessageMap.size()); + + _unacknowledgedMessageMap.size()); if (_log.isDebugEnabled()) { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - int count = 0; + { + int count = 0; - public boolean callback(UnacknowledgedMessage message) throws AMQException - { - _log.debug( + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug( (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]"); - return false; // Continue - } + return false; // Continue + } - public void visitComplete() - { } - }); + public void visitComplete() + { + } + }); } } @@ -603,53 +626,54 @@ public class AMQChannel // Marking messages who still have a consumer for to be resent // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException { - public boolean callback(UnacknowledgedMessage message) throws AMQException + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if (consumerTag != null) { - AMQShortString consumerTag = message.consumerTag; - AMQMessage msg = message.message; - msg.setRedelivered(true); - if (consumerTag != null) + // Consumer exists + if (_consumerTag2QueueMap.containsKey(consumerTag)) { - // Consumer exists - if (_consumerTag2QueueMap.containsKey(consumerTag)) - { - msgToResend.add(message); - } - else // consumer has gone - { - msgToRequeue.add(message); - } + msgToResend.add(message); } - else + else // consumer has gone + { + msgToRequeue.add(message); + } + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null) { - // Message has no consumer tag, so was "delivered" to a GET - // or consumer no longer registered - // cannot resend, so re-queue. - if (message.queue != null) + if (requeue) { - if (requeue) - { - msgToRequeue.add(message); - } - else - { - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); - } + msgToRequeue.add(message); } else { - _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); } } - - // false means continue processing - return false; + else + { + _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + } } - public void visitComplete() - { } - }); + // false means continue processing + return false; + } + + public void visitComplete() + { + } + }); // Process Messages to Resend if (_log.isDebugEnabled()) @@ -704,7 +728,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug("Subscription(" + System.identityHashCode(sub) - + ") closed during resend so requeuing message"); + + ") closed during resend so requeuing message"); } // move this message to requeue msgToRequeue.add(message); @@ -714,7 +738,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" - + System.identityHashCode(sub)); + + System.identityHashCode(sub)); } sub.addToResendQueue(msg); @@ -728,7 +752,7 @@ public class AMQChannel if (_log.isInfoEnabled()) { _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() - + ")to prevent loss"); + + ")to prevent loss"); } // move this message to requeue msgToRequeue.add(message); @@ -752,7 +776,7 @@ public class AMQChannel if (_nonTransactedContext == null) { _nonTransactedContext = - new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; @@ -786,29 +810,30 @@ public class AMQChannel public void queueDeleted(final AMQQueue queue) throws AMQException { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException { - public boolean callback(UnacknowledgedMessage message) throws AMQException + if (message.queue == queue) { - if (message.queue == queue) + try { - try - { - message.discard(_storeContext); - message.queue = null; - } - catch (AMQException e) - { - _log.error( + message.discard(_storeContext); + message.queue = null; + } + catch (AMQException e) + { + _log.error( "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); - } } - - return false; } - public void visitComplete() - { } - }); + return false; + } + + public void visitComplete() + { + } + }); } /** @@ -856,8 +881,8 @@ public class AMQChannel boolean suspend; suspend = - ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) - || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); + ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) + || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); setSuspended(suspend); } @@ -942,7 +967,7 @@ public class AMQChannel { AMQMessage message = bouncedMessage.getAMQMessage(); session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + new AMQShortString(bouncedMessage.getMessage())); message.decrementReference(_storeContext); } @@ -959,7 +984,7 @@ public class AMQChannel else { boolean willSuspend = - ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); + ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); if (!willSuspend) { final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); |
