From 90c8a29045f18554fd4c2da5ad01dd00af11cae7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 6 Aug 2014 11:05:05 +0000 Subject: QPID-5965 : [Java Broker] flow transient messages to disk in low memory situations git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616155 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 13 ++++++ .../berkeleydb/BDBHAReplicaVirtualHost.java | 12 ++++++ .../server/message/internal/InternalMessage.java | 27 +++++++++---- .../java/org/apache/qpid/server/model/Broker.java | 6 +++ .../java/org/apache/qpid/server/model/Queue.java | 8 ++++ .../org/apache/qpid/server/model/VirtualHost.java | 5 +++ .../qpid/server/model/adapter/BrokerAdapter.java | 33 +++++++++++++++ .../org/apache/qpid/server/queue/AMQQueue.java | 4 ++ .../apache/qpid/server/queue/AbstractQueue.java | 33 +++++++++++++++ .../server/store/AbstractJDBCMessageStore.java | 25 ++++++++++++ .../qpid/server/store/StoredMemoryMessage.java | 12 ++++++ .../apache/qpid/server/store/StoredMessage.java | 4 ++ .../server/virtualhost/AbstractVirtualHost.java | 47 ++++++++++++++++++++++ .../AsynchronousMessageStoreRecoverer.java | 14 ++++++- .../v0_10/MessageConverter_Internal_to_v0_10.java | 15 ++++++- .../protocol/v0_10/MessageConverter_v0_10.java | 15 ++++++- .../v0_8/MessageConverter_Internal_to_v0_8.java | 13 +++++- .../server/protocol/v0_8/MockStoredMessage.java | 17 ++++++-- .../protocol/v1_0/MessageConverter_to_1_0.java | 15 ++++++- .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java | 13 +++++- .../v0_8_v0_10/MessageConverter_0_10_to_0_8.java | 14 ++++++- .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java | 14 ++++++- .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 13 +++++- 23 files changed, 348 insertions(+), 24 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 7e5f5bbb3f..338882e6df 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1424,11 +1424,24 @@ public abstract class AbstractBDBMessageStore implements MessageStore storedSizeChangeOccurred(-delta); } + @Override + public boolean isInMemory() + { + return _messageDataRef.isHardRef(); + } + private boolean stored() { return !_messageDataRef.isHardRef(); } + @Override + public boolean flowToDisk() + { + flushToStore(); + return true; + } + @Override public String toString() { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java index 74242df7c6..55805b5626 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHost.java @@ -337,6 +337,18 @@ public class BDBHAReplicaVirtualHost extends AbstractConfiguredObject { private final Object _messageBody; @@ -239,6 +238,18 @@ public class InternalMessage extends AbstractServerMessageImpl> extends ConfiguredObject, EventL String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; + String BROKER_FLOW_TO_DISK_THRESHOLD = "broker.flowToDiskThreshold"; String QPID_AMQP_PORT = "qpid.amqp_port"; String QPID_HTTP_PORT = "qpid.http_port"; @@ -74,6 +75,9 @@ public interface Broker> extends ConfiguredObject, EventL @ManagedContextDefault(name = QPID_JMX_PORT) public static final String DEFAULT_JMX_PORT_NUMBER = "9099"; + @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD) + public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory()); + @DerivedAttribute String getBuildVersion(); @@ -190,4 +194,6 @@ public interface Broker> extends ConfiguredObject, EventL AuthenticationProvider getManagementModeAuthenticationProvider(); + void assignTargetSizes(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index e98f6cd19b..4e216925e4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -49,6 +49,14 @@ public interface Queue> extends ConfiguredObject String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; + String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; + @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) + long DEFAULT_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = 102400l; + + String QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = "queue.estimatedMessageMemoryOverhead"; + @ManagedContextDefault( name = QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) + long DEFAULT_ESTIMATED_MESSAGE_MEMORY_OVERHEAD = 1024l; + @ManagedAttribute Exchange getAlternateExchange(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 518141a21d..6714a53e9e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -172,4 +172,9 @@ public interface VirtualHost, Q extends Queue, MessageStore getMessageStore(); String getType(); + + void setTargetSize(long targetSize); + + long getTotalQueueDepthBytes(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 2d8a64b920..67c713e9d9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -384,6 +384,39 @@ public class BrokerAdapter extends AbstractConfiguredObject imple return children; } + @Override + public synchronized void assignTargetSizes() + { + long totalTarget = getContextValue(Long.class,BROKER_FLOW_TO_DISK_THRESHOLD); + long totalSize = 0l; + Collection> vhns = getVirtualHostNodes(); + Map,Long> vhs = new HashMap<>(); + for(VirtualHostNode vhn : vhns) + { + VirtualHost vh = vhn.getVirtualHost(); + if(vh != null) + { + long totalQueueDepthBytes = vh.getTotalQueueDepthBytes(); + vhs.put(vh,totalQueueDepthBytes); + totalSize += totalQueueDepthBytes; + } + } + + for(Map.Entry,Long> entry : vhs.entrySet()) + { + + long size = (long) (entry.getValue().doubleValue() * ((double) totalTarget / (double) totalSize)); + entry.getKey().setTargetSize(size); + } + } + + @Override + protected void onOpen() + { + super.onOpen(); + assignTargetSizes(); + } + public AuthenticationProvider findAuthenticationProviderByName(String authenticationProviderName) { if (isManagementMode()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 03bba43d57..9cfa7dbcf3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -111,4 +111,8 @@ public interface AMQQueue> void completeRecovery(); void recover(ServerMessage message); + + void setTargetSize(long targetSize); + + long getPotentialMemoryFootprint(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 11d5cc733f..c889fa7740 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -79,6 +79,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -113,6 +114,8 @@ public abstract class AbstractQueue> } }; + private static final long INITIAL_TARGET_QUEUE_SIZE = 102400l; + private final VirtualHostImpl _virtualHost; private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); @@ -130,6 +133,8 @@ public abstract class AbstractQueue> private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE); + private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); private final AtomicLong _totalMessagesReceived = new AtomicLong(); @@ -924,6 +929,11 @@ public abstract class AbstractQueue> incrementQueueCount(); incrementQueueSize(message); + if((_atomicQueueSize.get() + _atomicQueueCount.get()*1024l) > _targetQueueSize.get() && message.getStoredMessage().isInMemory()) + { + message.getStoredMessage().flowToDisk(); + } + _totalMessagesReceived.incrementAndGet(); if(_recovering.get()) @@ -1206,6 +1216,12 @@ public abstract class AbstractQueue> } } + @Override + public void setTargetSize(final long targetSize) + { + _targetQueueSize.set(targetSize); + } + public long getTotalDequeuedMessages() { return _dequeueCount.get(); @@ -2188,6 +2204,9 @@ public abstract class AbstractQueue> { QueueEntryIterator queueListIterator = getEntries().iterator(); + long totalSize = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages(); + long targetSize = _targetQueueSize.get(); + while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); @@ -2210,8 +2229,15 @@ public abstract class AbstractQueue> // the time the check actually occurs. So verify we // can actually get the message to perform the check. ServerMessage msg = node.getMessage(); + if (msg != null) { + totalSize += msg.getSize(); + StoredMessage storedMessage = msg.getStoredMessage(); + if(totalSize > targetSize && storedMessage.isInMemory()) + { + storedMessage.flowToDisk(); + } checkForNotification(msg); } } @@ -2220,6 +2246,13 @@ public abstract class AbstractQueue> } + @Override + public long getPotentialMemoryFootprint() + { + return Math.max(getContextValue(Long.class,QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT), + getQueueDepthBytes() + getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD) * getQueueDepthMessages()); + } + public long getAlertRepeatGap() { return _alertRepeatGap; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 1a1085339d..bb7a726a0c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -1576,6 +1576,31 @@ public abstract class AbstractJDBCMessageStore implements MessageStore storedSizeChange(-delta); } + @Override + public boolean isInMemory() + { + return _messageDataRef.isHardRef(); + } + + @Override + public boolean flowToDisk() + { + try(Connection conn = newConnection()) + { + store(conn); + conn.commit(); + } + catch (SQLException e) + { + throw new StoreException(e); + } + finally + { + + } + return true; + } + private synchronized Runnable store(final Connection conn) throws SQLException { if (!stored()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index e1043e8807..e8402c9268 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -130,4 +130,16 @@ public class StoredMemoryMessage implements S public void remove() { } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 6beb74f4ae..7561b4a11c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -35,4 +35,8 @@ public interface StoredMessage ByteBuffer getContent(int offsetInMessage, int size); void remove(); + + boolean isInMemory(); + + boolean flowToDisk(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 707be9ed7b..f15f608907 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -130,6 +131,8 @@ public abstract class AbstractVirtualHost> exte private final AtomicBoolean _deleted = new AtomicBoolean(); private final VirtualHostNode _virtualHostNode; + private final AtomicLong _targetSize = new AtomicLong(1024*1024); + private MessageStoreLogSubject _messageStoreLogSubject; @ManagedAttributeField @@ -847,6 +850,10 @@ public abstract class AbstractVirtualHost> exte public void execute() { + VirtualHostNode virtualHostNode = getParent(VirtualHostNode.class); + Broker broker = virtualHostNode.getParent(Broker.class); + broker.assignTargetSizes(); + for (AMQQueue q : getQueues()) { if (q.getState() == State.ACTIVE) @@ -1310,6 +1317,46 @@ public abstract class AbstractVirtualHost> exte return _virtualHostNode.getConfigurationStore(); } + @Override + public void setTargetSize(final long targetSize) + { + _targetSize.set(targetSize); + allocateTargetSizeToQueues(); + } + + private void allocateTargetSizeToQueues() + { + long targetSize = _targetSize.get(); + Collection> queues = getQueues(); + long totalSize = calculateTotalEnqueuedSize(queues); + if(targetSize > 0l) + { + for (AMQQueue q : queues) + { + long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize)) + * (double) targetSize); + + q.setTargetSize(size); + } + } + } + + @Override + public long getTotalQueueDepthBytes() + { + return calculateTotalEnqueuedSize(getQueues()); + } + + private long calculateTotalEnqueuedSize(final Collection> queues) + { + long total = 0; + for(AMQQueue queue : queues) + { + total += queue.getPotentialMemoryFootprint(); + } + return total; + } + @Override protected void onCreate() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 750efc23ae..899cfdcd6e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -147,6 +149,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer entry.getValue().release(); entry.setValue(null); // free up any memory associated with the reference object } + final List> messagesToDelete = new ArrayList<>(); getStore().visitMessages(new MessageHandler() { @Override @@ -156,12 +159,19 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer long messageNumber = storedMessage.getMessageNumber(); if(!_recoveredMessages.containsKey(messageNumber)) { - _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing...."); - storedMessage.remove(); + messagesToDelete.add(storedMessage); } return messageNumber <_maxMessageId-1; } }); + for(StoredMessage storedMessage : messagesToDelete) + { + + _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } + + messagesToDelete.clear(); _recoveredMessages.clear(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index dfdc4e230c..69abcd7727 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.DeliveryProperties; @@ -106,7 +105,19 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter { private long _messageId; @@ -107,4 +106,16 @@ public class MockStoredMessage implements StoredMessage public void remove() { } + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index f6d849bf79..5b9bdc7244 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -37,7 +37,6 @@ import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -265,7 +264,19 @@ public abstract class MessageConverter_to_1_0 implement { throw new UnsupportedOperationException(); } - }; + + @Override + public boolean isInMemory() + { + return true; + } + + @Override + public boolean flowToDisk() + { + return false; + } + }; } protected Section getBodySection(final M serverMessage, final String mimeType) diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index bc2d3fe375..8d77a8cfaf 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -29,7 +29,6 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; import org.apache.qpid.server.protocol.v1_0.Message_1_0; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.DeliveryProperties; @@ -115,6 +114,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter