summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-11-04 15:52:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-11-04 15:52:59 +0000
commitc64d0182543fd9b2b8029fb18f99993a3891977c (patch)
treeb77207767e0fe87c6b9cdde0272d85a95f2ae5f4 /qpid/java/broker-plugins
parent930ffe78c89a3937b38c61bc5c7b324e701e05f6 (diff)
downloadqpid-python-c64d0182543fd9b2b8029fb18f99993a3891977c.tar.gz
QPID-6207 : [Java Broker] Flow uncommitted messages to disk if combined size greater than threshold
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636617 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java57
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java49
2 files changed, 104 insertions, 2 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index dc5635654e..1ee85a8e26 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -27,6 +27,7 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -74,6 +75,7 @@ import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -135,7 +137,6 @@ public class ServerSession extends Session
private long _blockTime;
private long _blockingTimeout;
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -168,6 +169,11 @@ public class ServerSession extends Session
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
+ private long _maxUncommittedInMemorySize;
+
+
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
super(connection, delegate, name, expiry);
@@ -188,6 +194,8 @@ public class ServerSession extends Session
_blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class,
Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+ _maxUncommittedInMemorySize = getVirtualHost().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
}
protected void setState(final State state)
@@ -258,9 +266,46 @@ public class ServerSession extends Session
);
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
+ incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
}
+ private void resetUncommittedMessages()
+ {
+ _uncommittedMessageSize = 0l;
+ _uncommittedMessages.clear();
+ }
+
+ private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle)
+ {
+ if (isTransactional() && !(_transaction instanceof DistributedTransaction))
+ {
+ _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+ {
+ getVirtualHost().getEventLogger()
+ .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+ }
+
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
@@ -620,6 +665,7 @@ public class ServerSession extends Session
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
public void rollback()
@@ -629,6 +675,7 @@ public class ServerSession extends Session
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
@@ -707,7 +754,7 @@ public class ServerSession extends Session
return getVirtualHost().getMessageStore();
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return getConnection().getVirtualHost();
}
@@ -1082,6 +1129,12 @@ public class ServerSession extends Session
}
}
+
+ public final long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
@Override
public int compareTo(AMQSessionModel o)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 37ac1f84c4..012d7bffd6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -82,6 +82,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -206,6 +207,9 @@ public class AMQChannel
private long _blockingTimeout;
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
+ private long _maxUncommittedInMemorySize;
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
@@ -216,6 +220,7 @@ public class AMQChannel
connection.getAuthorizedSubject().getPublicCredentials(),
connection.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
+ _maxUncommittedInMemorySize = connection.getVirtualHost().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_logSubject = new ChannelLogSubject(this);
_messageStore = messageStore;
@@ -481,6 +486,7 @@ public class AMQChannel
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
+ incrementUncommittedMessageSize(handle);
incrementOutstandingTxnsIfNecessary();
}
}
@@ -506,6 +512,36 @@ public class AMQChannel
}
+ private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle)
+ {
+ if (isTransactional())
+ {
+ _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+ {
+ getVirtualHost().getEventLogger()
+ .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+ }
+
/**
* Either throws a {@link AMQConnectionException} or returns the message
*
@@ -1182,6 +1218,13 @@ public class AMQChannel
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
+ resetUncommittedMessages();
+ }
+
+ private void resetUncommittedMessages()
+ {
+ _uncommittedMessageSize = 0l;
+ _uncommittedMessages.clear();
}
public void rollback(Runnable postRollbackTask)
@@ -1209,6 +1252,7 @@ public class AMQChannel
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
postRollbackTask.run();
@@ -1368,6 +1412,11 @@ public class AMQChannel
return _currentMessage != null;
}
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
private class GetDeliveryMethod implements ClientDeliveryMethod
{