diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-11-04 15:52:59 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-11-04 15:52:59 +0000 |
| commit | c64d0182543fd9b2b8029fb18f99993a3891977c (patch) | |
| tree | b77207767e0fe87c6b9cdde0272d85a95f2ae5f4 /qpid/java | |
| parent | 930ffe78c89a3937b38c61bc5c7b324e701e05f6 (diff) | |
| download | qpid-python-c64d0182543fd9b2b8029fb18f99993a3891977c.tar.gz | |
QPID-6207 : [Java Broker] Flow uncommitted messages to disk if combined size greater than threshold
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
5 files changed, 150 insertions, 7 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java index 6ae1ac4f02..c112dd4292 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java @@ -22,15 +22,14 @@ package org.apache.qpid.server.logging.messages; import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX; -import java.text.MessageFormat; -import java.util.Locale; -import java.util.ResourceBundle; - import org.apache.log4j.Logger; - import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.logging.LogMessage; +import java.text.MessageFormat; +import java.util.Locale; +import java.util.ResourceBundle; + /** * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED. * @@ -51,6 +50,7 @@ public class ChannelMessages public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close"; public static final String PREFETCH_SIZE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.prefetch_size"; public static final String CLOSE_FORCED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close_forced"; + public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.large_transaction_warn"; public static final String DEADLETTERMSG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg"; public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch"; public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn"; @@ -68,6 +68,7 @@ public class ChannelMessages Logger.getLogger(CLOSE_LOG_HIERARCHY); Logger.getLogger(PREFETCH_SIZE_LOG_HIERARCHY); Logger.getLogger(CLOSE_FORCED_LOG_HIERARCHY); + Logger.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY); Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY); Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY); Logger.getLogger(IDLE_TXN_LOG_HIERARCHY); @@ -263,6 +264,38 @@ public class ChannelMessages /** * Log a Channel message of the Format: + * <pre>CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data.</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage LARGE_TRANSACTION_WARN(Number param1) + { + String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN"); + + final Object[] messageArguments = {param1}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return LARGE_TRANSACTION_WARN_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Channel message of the Format: * <pre>CHN-1011 : Message : {0,number} moved to dead letter queue : {1}</pre> * Optional values are contained in [square brackets] and are numbered * sequentially in the method call. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index 5c6e066541..4ae8e39d25 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -40,3 +40,6 @@ DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1} FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed. + +LARGE_TRANSACTION_WARN = CHN-1013 : Uncommitted transaction contains {0,number} bytes of incoming message data. + diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java index 5b3965904e..b28441438d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java @@ -44,6 +44,11 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X> String PORT = "port"; + String MAX_UNCOMMITTED_IN_MEMORY_SIZE = "connection.maxUncommittedInMemorySize"; + + @ManagedContextDefault(name = MAX_UNCOMMITTED_IN_MEMORY_SIZE) + long DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE = 10l * 1024l * 1024l; + @DerivedAttribute String getClientId(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index dc5635654e..1ee85a8e26 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -27,6 +27,7 @@ import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -74,6 +75,7 @@ import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -135,7 +137,6 @@ public class ServerSession extends Session private long _blockTime; private long _blockingTimeout; - public static interface MessageDispositionChangeListener { public void onAccept(); @@ -168,6 +169,11 @@ public class ServerSession extends Session private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>(); + private volatile long _uncommittedMessageSize; + private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>(); + private long _maxUncommittedInMemorySize; + + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { super(connection, delegate, name, expiry); @@ -188,6 +194,8 @@ public class ServerSession extends Session _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); + _maxUncommittedInMemorySize = getVirtualHost().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); + } protected void setState(final State state) @@ -258,9 +266,46 @@ public class ServerSession extends Session ); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); + incrementUncommittedMessageSize(message.getStoredMessage()); return enqueues; } + private void resetUncommittedMessages() + { + _uncommittedMessageSize = 0l; + _uncommittedMessages.clear(); + } + + private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle) + { + if (isTransactional() && !(_transaction instanceof DistributedTransaction)) + { + _uncommittedMessageSize += handle.getMetaData().getContentSize(); + if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) + { + handle.flowToDisk(); + if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) + { + getVirtualHost().getEventLogger() + .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); + } + + if(!_uncommittedMessages.isEmpty()) + { + for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages) + { + uncommittedHandle.flowToDisk(); + } + _uncommittedMessages.clear(); + } + } + else + { + _uncommittedMessages.add(handle); + } + } + } + public void sendMessage(MessageTransfer xfr, Runnable postIdSettingAction) @@ -620,6 +665,7 @@ public class ServerSession extends Session _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); + resetUncommittedMessages(); } public void rollback() @@ -629,6 +675,7 @@ public class ServerSession extends Session _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); + resetUncommittedMessages(); } @@ -707,7 +754,7 @@ public class ServerSession extends Session return getVirtualHost().getMessageStore(); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return getConnection().getVirtualHost(); } @@ -1082,6 +1129,12 @@ public class ServerSession extends Session } } + + public final long getMaxUncommittedInMemorySize() + { + return _maxUncommittedInMemorySize; + } + @Override public int compareTo(AMQSessionModel o) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 37ac1f84c4..012d7bffd6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -82,6 +82,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -206,6 +207,9 @@ public class AMQChannel private long _blockingTimeout; private boolean _confirmOnPublish; private long _confirmedMessageCounter; + private volatile long _uncommittedMessageSize; + private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); + private long _maxUncommittedInMemorySize; public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { @@ -216,6 +220,7 @@ public class AMQChannel connection.getAuthorizedSubject().getPublicCredentials(), connection.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); + _maxUncommittedInMemorySize = connection.getVirtualHost().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); _logSubject = new ChannelLogSubject(this); _messageStore = messageStore; @@ -481,6 +486,7 @@ public class AMQChannel .createBasicAckBody(_confirmedMessageCounter, false); _connection.writeFrame(responseBody.generateFrame(_channelId)); } + incrementUncommittedMessageSize(handle); incrementOutstandingTxnsIfNecessary(); } } @@ -506,6 +512,36 @@ public class AMQChannel } + private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle) + { + if (isTransactional()) + { + _uncommittedMessageSize += handle.getMetaData().getContentSize(); + if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) + { + handle.flowToDisk(); + if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) + { + getVirtualHost().getEventLogger() + .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); + } + + if(!_uncommittedMessages.isEmpty()) + { + for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages) + { + uncommittedHandle.flowToDisk(); + } + _uncommittedMessages.clear(); + } + } + else + { + _uncommittedMessages.add(handle); + } + } + } + /** * Either throws a {@link AMQConnectionException} or returns the message * @@ -1182,6 +1218,13 @@ public class AMQChannel _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } + resetUncommittedMessages(); + } + + private void resetUncommittedMessages() + { + _uncommittedMessageSize = 0l; + _uncommittedMessages.clear(); } public void rollback(Runnable postRollbackTask) @@ -1209,6 +1252,7 @@ public class AMQChannel _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); + resetUncommittedMessages(); } postRollbackTask.run(); @@ -1368,6 +1412,11 @@ public class AMQChannel return _currentMessage != null; } + public long getMaxUncommittedInMemorySize() + { + return _maxUncommittedInMemorySize; + } + private class GetDeliveryMethod implements ClientDeliveryMethod { |
