diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-11-04 15:52:59 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-11-04 15:52:59 +0000 |
| commit | c64d0182543fd9b2b8029fb18f99993a3891977c (patch) | |
| tree | b77207767e0fe87c6b9cdde0272d85a95f2ae5f4 /qpid/java/broker-plugins | |
| parent | 930ffe78c89a3937b38c61bc5c7b324e701e05f6 (diff) | |
| download | qpid-python-c64d0182543fd9b2b8029fb18f99993a3891977c.tar.gz | |
QPID-6207 : [Java Broker] Flow uncommitted messages to disk if combined size greater than threshold
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
2 files changed, 104 insertions, 2 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index dc5635654e..1ee85a8e26 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -27,6 +27,7 @@ import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -74,6 +75,7 @@ import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -135,7 +137,6 @@ public class ServerSession extends Session private long _blockTime; private long _blockingTimeout; - public static interface MessageDispositionChangeListener { public void onAccept(); @@ -168,6 +169,11 @@ public class ServerSession extends Session private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>(); + private volatile long _uncommittedMessageSize; + private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>(); + private long _maxUncommittedInMemorySize; + + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { super(connection, delegate, name, expiry); @@ -188,6 +194,8 @@ public class ServerSession extends Session _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); + _maxUncommittedInMemorySize = getVirtualHost().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); + } protected void setState(final State state) @@ -258,9 +266,46 @@ public class ServerSession extends Session ); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); + incrementUncommittedMessageSize(message.getStoredMessage()); return enqueues; } + private void resetUncommittedMessages() + { + _uncommittedMessageSize = 0l; + _uncommittedMessages.clear(); + } + + private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle) + { + if (isTransactional() && !(_transaction instanceof DistributedTransaction)) + { + _uncommittedMessageSize += handle.getMetaData().getContentSize(); + if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) + { + handle.flowToDisk(); + if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) + { + getVirtualHost().getEventLogger() + .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); + } + + if(!_uncommittedMessages.isEmpty()) + { + for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages) + { + uncommittedHandle.flowToDisk(); + } + _uncommittedMessages.clear(); + } + } + else + { + _uncommittedMessages.add(handle); + } + } + } + public void sendMessage(MessageTransfer xfr, Runnable postIdSettingAction) @@ -620,6 +665,7 @@ public class ServerSession extends Session _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); + resetUncommittedMessages(); } public void rollback() @@ -629,6 +675,7 @@ public class ServerSession extends Session _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); + resetUncommittedMessages(); } @@ -707,7 +754,7 @@ public class ServerSession extends Session return getVirtualHost().getMessageStore(); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return getConnection().getVirtualHost(); } @@ -1082,6 +1129,12 @@ public class ServerSession extends Session } } + + public final long getMaxUncommittedInMemorySize() + { + return _maxUncommittedInMemorySize; + } + @Override public int compareTo(AMQSessionModel o) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 37ac1f84c4..012d7bffd6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -82,6 +82,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -206,6 +207,9 @@ public class AMQChannel private long _blockingTimeout; private boolean _confirmOnPublish; private long _confirmedMessageCounter; + private volatile long _uncommittedMessageSize; + private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); + private long _maxUncommittedInMemorySize; public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { @@ -216,6 +220,7 @@ public class AMQChannel connection.getAuthorizedSubject().getPublicCredentials(), connection.getAuthorizedSubject().getPrivateCredentials()); _subject.getPrincipals().add(new SessionPrincipal(this)); + _maxUncommittedInMemorySize = connection.getVirtualHost().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); _logSubject = new ChannelLogSubject(this); _messageStore = messageStore; @@ -481,6 +486,7 @@ public class AMQChannel .createBasicAckBody(_confirmedMessageCounter, false); _connection.writeFrame(responseBody.generateFrame(_channelId)); } + incrementUncommittedMessageSize(handle); incrementOutstandingTxnsIfNecessary(); } } @@ -506,6 +512,36 @@ public class AMQChannel } + private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle) + { + if (isTransactional()) + { + _uncommittedMessageSize += handle.getMetaData().getContentSize(); + if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) + { + handle.flowToDisk(); + if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) + { + getVirtualHost().getEventLogger() + .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); + } + + if(!_uncommittedMessages.isEmpty()) + { + for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages) + { + uncommittedHandle.flowToDisk(); + } + _uncommittedMessages.clear(); + } + } + else + { + _uncommittedMessages.add(handle); + } + } + } + /** * Either throws a {@link AMQConnectionException} or returns the message * @@ -1182,6 +1218,13 @@ public class AMQChannel _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } + resetUncommittedMessages(); + } + + private void resetUncommittedMessages() + { + _uncommittedMessageSize = 0l; + _uncommittedMessages.clear(); } public void rollback(Runnable postRollbackTask) @@ -1209,6 +1252,7 @@ public class AMQChannel _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); + resetUncommittedMessages(); } postRollbackTask.run(); @@ -1368,6 +1412,11 @@ public class AMQChannel return _currentMessage != null; } + public long getMaxUncommittedInMemorySize() + { + return _maxUncommittedInMemorySize; + } + private class GetDeliveryMethod implements ClientDeliveryMethod { |
