summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-11-04 15:52:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-11-04 15:52:59 +0000
commitc64d0182543fd9b2b8029fb18f99993a3891977c (patch)
treeb77207767e0fe87c6b9cdde0272d85a95f2ae5f4 /qpid/java
parent930ffe78c89a3937b38c61bc5c7b324e701e05f6 (diff)
downloadqpid-python-c64d0182543fd9b2b8029fb18f99993a3891977c.tar.gz
QPID-6207 : [Java Broker] Flow uncommitted messages to disk if combined size greater than threshold
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java43
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java57
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java49
5 files changed, 150 insertions, 7 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
index 6ae1ac4f02..c112dd4292 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
@@ -22,15 +22,14 @@ package org.apache.qpid.server.logging.messages;
import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
-import java.text.MessageFormat;
-import java.util.Locale;
-import java.util.ResourceBundle;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.logging.LogMessage;
+import java.text.MessageFormat;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
/**
* DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
*
@@ -51,6 +50,7 @@ public class ChannelMessages
public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close";
public static final String PREFETCH_SIZE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.prefetch_size";
public static final String CLOSE_FORCED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close_forced";
+ public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.large_transaction_warn";
public static final String DEADLETTERMSG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg";
public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
@@ -68,6 +68,7 @@ public class ChannelMessages
Logger.getLogger(CLOSE_LOG_HIERARCHY);
Logger.getLogger(PREFETCH_SIZE_LOG_HIERARCHY);
Logger.getLogger(CLOSE_FORCED_LOG_HIERARCHY);
+ Logger.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY);
Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
Logger.getLogger(IDLE_TXN_LOG_HIERARCHY);
@@ -263,6 +264,38 @@ public class ChannelMessages
/**
* Log a Channel message of the Format:
+ * <pre>CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage LARGE_TRANSACTION_WARN(Number param1)
+ {
+ String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Channel message of the Format:
* <pre>CHN-1011 : Message : {0,number} moved to dead letter queue : {1}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index 5c6e066541..4ae8e39d25 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -40,3 +40,6 @@ DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on
DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
+
+LARGE_TRANSACTION_WARN = CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.
+
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 5b3965904e..b28441438d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -44,6 +44,11 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
String PORT = "port";
+ String MAX_UNCOMMITTED_IN_MEMORY_SIZE = "connection.maxUncommittedInMemorySize";
+
+ @ManagedContextDefault(name = MAX_UNCOMMITTED_IN_MEMORY_SIZE)
+ long DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE = 10l * 1024l * 1024l;
+
@DerivedAttribute
String getClientId();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index dc5635654e..1ee85a8e26 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -27,6 +27,7 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -74,6 +75,7 @@ import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -135,7 +137,6 @@ public class ServerSession extends Session
private long _blockTime;
private long _blockingTimeout;
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -168,6 +169,11 @@ public class ServerSession extends Session
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
+ private long _maxUncommittedInMemorySize;
+
+
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
super(connection, delegate, name, expiry);
@@ -188,6 +194,8 @@ public class ServerSession extends Session
_blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class,
Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+ _maxUncommittedInMemorySize = getVirtualHost().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
}
protected void setState(final State state)
@@ -258,9 +266,46 @@ public class ServerSession extends Session
);
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
+ incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
}
+ private void resetUncommittedMessages()
+ {
+ _uncommittedMessageSize = 0l;
+ _uncommittedMessages.clear();
+ }
+
+ private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle)
+ {
+ if (isTransactional() && !(_transaction instanceof DistributedTransaction))
+ {
+ _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+ {
+ getVirtualHost().getEventLogger()
+ .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+ }
+
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
@@ -620,6 +665,7 @@ public class ServerSession extends Session
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
public void rollback()
@@ -629,6 +675,7 @@ public class ServerSession extends Session
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
@@ -707,7 +754,7 @@ public class ServerSession extends Session
return getVirtualHost().getMessageStore();
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return getConnection().getVirtualHost();
}
@@ -1082,6 +1129,12 @@ public class ServerSession extends Session
}
}
+
+ public final long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
@Override
public int compareTo(AMQSessionModel o)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 37ac1f84c4..012d7bffd6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -82,6 +82,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -206,6 +207,9 @@ public class AMQChannel
private long _blockingTimeout;
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
+ private long _maxUncommittedInMemorySize;
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
@@ -216,6 +220,7 @@ public class AMQChannel
connection.getAuthorizedSubject().getPublicCredentials(),
connection.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
+ _maxUncommittedInMemorySize = connection.getVirtualHost().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_logSubject = new ChannelLogSubject(this);
_messageStore = messageStore;
@@ -481,6 +486,7 @@ public class AMQChannel
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
+ incrementUncommittedMessageSize(handle);
incrementOutstandingTxnsIfNecessary();
}
}
@@ -506,6 +512,36 @@ public class AMQChannel
}
+ private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle)
+ {
+ if (isTransactional())
+ {
+ _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+ {
+ getVirtualHost().getEventLogger()
+ .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+ }
+
/**
* Either throws a {@link AMQConnectionException} or returns the message
*
@@ -1182,6 +1218,13 @@ public class AMQChannel
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
+ resetUncommittedMessages();
+ }
+
+ private void resetUncommittedMessages()
+ {
+ _uncommittedMessageSize = 0l;
+ _uncommittedMessages.clear();
}
public void rollback(Runnable postRollbackTask)
@@ -1209,6 +1252,7 @@ public class AMQChannel
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
postRollbackTask.run();
@@ -1368,6 +1412,11 @@ public class AMQChannel
return _currentMessage != null;
}
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
private class GetDeliveryMethod implements ClientDeliveryMethod
{