From 01b4cfa2d93ba951d0983bc4cd4b94dd87ea9400 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 12 Feb 2007 11:03:12 +0000 Subject: Fixed Xmx value ConcurrentSelectorDeliveryManager : Added trace logging. Ensured messages are removed when required, rather than leaking memory. AMQSession moved exceptions in recover to wrap method rather than individual suspend calls. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@506413 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/AppliedPatches.txt | 1 + qpid/java/broker/etc/qpid-server.conf | 2 +- .../queue/ConcurrentSelectorDeliveryManager.java | 12 ++++++++++-- .../qpid/server/store/MemoryMessageStore.java | 12 ++++++++---- .../java/org/apache/qpid/client/AMQSession.java | 21 ++++----------------- qpid/java/distribution/pom.xml | 2 +- 6 files changed, 25 insertions(+), 25 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/AppliedPatches.txt b/qpid/java/AppliedPatches.txt index afc7339731..ac53368036 100644 --- a/qpid/java/AppliedPatches.txt +++ b/qpid/java/AppliedPatches.txt @@ -1,5 +1,6 @@ -Notes- Updated to resolve Rollback/Recover problems. +Changed Xmx to 1024 -Latest Revision- 504112,503646,502576.499979 diff --git a/qpid/java/broker/etc/qpid-server.conf b/qpid/java/broker/etc/qpid-server.conf index 6d31db7fa9..0e98f72a37 100644 --- a/qpid/java/broker/etc/qpid-server.conf +++ b/qpid/java/broker/etc/qpid-server.conf @@ -21,5 +21,5 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar export JAVA=java \ JAVA_VM=-server \ - JAVA_MEM="-Xmx3160m -Xms512m "\ + JAVA_MEM="-Xmx1024m -Xms512m "\ CLASSPATH=$QPID_LIBS diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 1a26bab011..9b79657575 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -230,6 +230,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager Queue messageQueue = sub.getNextQueue(_messages); + if (_log.isTraceEnabled()) + { + _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + ") from queue (" + System.identityHashCode(messageQueue) + + ") AMQQueue (" + System.identityHashCode(queue) + ")"); + } + if (messageQueue == null) { // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector @@ -276,7 +283,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { //fixme _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages"); - //_messages.remove(message); + //inefficient + _messages.remove(message); } } @@ -353,7 +361,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + - " subscribers to give the message to."); + " subscribers to give the message to. Queued count (" + getMessageCount() + ")"); } for (Subscription sub : _subscriptions.getSubscriptions()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 328aed81d9..5b6cab294c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -32,9 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -85,7 +83,13 @@ public class MemoryMessageStore implements MessageStore { _log.debug("Removing message with id " + messageId); } - _messageMap.remove(messageId); + Object o = _messageMap.remove(messageId); + + if (_log.isDebugEnabled()) + { + _log.debug("Removed message " + System.identityHashCode(o)); + } + } public void createQueue(AMQQueue queue) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8dce5d4494..e475270ecd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -812,15 +812,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (!isSuspended) { - try - { - suspendChannel(true); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } + suspendChannel(true); } + for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); @@ -839,17 +833,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _dispatcher.rollback(); } - + if (!isSuspended) { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - throw new JMSAMQException(e); - } + suspendChannel(false); } } catch (AMQException e) diff --git a/qpid/java/distribution/pom.xml b/qpid/java/distribution/pom.xml index 8079f041c1..4bfc941cd0 100644 --- a/qpid/java/distribution/pom.xml +++ b/qpid/java/distribution/pom.xml @@ -38,7 +38,7 @@ 1.5 ${pom.version} ${project.build.directory} - -r506403 + -r506413 -- cgit v1.2.1