From b877fef68695499fc63c1d0aef19cd1415981052 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 28 Jul 2014 10:41:14 +0000 Subject: QPID-5930 : [Java Broker] Minimize memory footprint for persistent messages git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613950 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/AbstractBDBMessageStore.java | 376 +++++++++++++++---- .../StandardEnvironmentFacadeFactory.java | 8 +- .../ReplicatedEnvironmentFacadeFactory.java | 4 +- .../virtualhost/berkeleydb/BDBVirtualHost.java | 7 + .../berkeleydb/BDBHAVirtualHostNodeImpl.java | 2 +- .../berkeleydb/BDBMessageStoreQuotaEventsTest.java | 13 +- .../server/message/AbstractServerMessageImpl.java | 32 +- .../qpid/server/message/InstanceProperties.java | 2 + .../server/model/AbstractConfiguredObject.java | 8 + .../apache/qpid/server/model/ConfiguredObject.java | 3 + .../apache/qpid/server/queue/QueueEntryImpl.java | 87 +++-- .../server/store/AbstractJDBCMessageStore.java | 399 +++++++++++++++++---- .../AbstractStandardVirtualHostNode.java | 2 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 2 - .../qpid/server/protocol/v0_8/AMQMessage.java | 15 +- .../derby/DerbyMessageStoreQuotaEventsTest.java | 7 +- .../store/jdbc/GenericJDBCConfigurationStore.java | 12 +- .../server/store/jdbc/GenericJDBCMessageStore.java | 17 +- .../apache/qpid/server/store/jdbc/JDBCDetails.java | 76 +++- 19 files changed, 852 insertions(+), 220 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index be592a0d42..7e5f5bbb3f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -558,6 +558,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + byte[] getAllContent(long messageId) throws StoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding contentTupleBinding = ContentBinding.getInstance(); + + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message Id: " + messageId + " Getting content body"); + } + + try + { + + int written = 0; + OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + if (status == OperationStatus.SUCCESS) + { + return contentTupleBinding.entryToObject(value); + } + else + { + throw new StoreException("Unable to find message with id " + messageId); + } + + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + + messageId + + " to database: " + + e.getMessage(), e); + } + } + private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) { Cursor cursor = null; @@ -810,12 +847,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private void recordXid(Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + private List recordXid(Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + org.apache.qpid.server.store.Transaction.Record[] enqueues, + org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException { DatabaseEntry key = new DatabaseEntry(); Xid xid = new Xid(format, globalId, branchId); @@ -826,10 +863,20 @@ public abstract class AbstractBDBMessageStore implements MessageStore PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); valueBinding.objectToEntry(preparedTransaction, value); + List postActions = new ArrayList<>(); + for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + { + StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); + if(storedMessage instanceof StoredBDBMessage) + { + postActions.add(((StoredBDBMessage) storedMessage).store(txn)); + } + } try { getXidDb().put(txn, key, value); + return postActions; } catch (DatabaseException e) { @@ -1041,17 +1088,127 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - class StoredBDBMessage implements StoredMessage + static interface MessageDataRef { + T getMetaData(); + byte[] getData(); + void setData(byte[] data); + boolean isHardRef(); + } - private final long _messageId; - private final boolean _isRecovered; + private static final class MessageDataHardRef implements MessageDataRef + { + private final T _metaData; + private byte[] _data; + + private MessageDataHardRef(final T metaData) + { + _metaData = metaData; + } + + @Override + public T getMetaData() + { + return _metaData; + } + + @Override + public byte[] getData() + { + return _data; + } + + @Override + public void setData(final byte[] data) + { + _data = data; + } + @Override + public boolean isHardRef() + { + return true; + } + } + + private static final class MessageData + { private T _metaData; - private volatile SoftReference _metaDataRef; + private SoftReference _data; - private byte[] _data; - private volatile SoftReference _dataRef; + private MessageData(final T metaData, final byte[] data) + { + _metaData = metaData; + + if(data != null) + { + _data = new SoftReference<>(data); + } + } + + public T getMetaData() + { + return _metaData; + } + + public byte[] getData() + { + return _data == null ? null : _data.get(); + } + + public void setData(final byte[] data) + { + _data = new SoftReference<>(data); + } + + + } + private static final class MessageDataSoftRef extends SoftReference> implements MessageDataRef + { + + public MessageDataSoftRef(final T metadata, byte[] data) + { + super(new MessageData(metadata, data)); + } + + @Override + public T getMetaData() + { + MessageData ref = get(); + return ref == null ? null : ref.getMetaData(); + } + + @Override + public byte[] getData() + { + MessageData ref = get(); + + return ref == null ? null : ref.getData(); + } + + @Override + public void setData(final byte[] data) + { + MessageData ref = get(); + if(ref != null) + { + ref.setData(data); + } + } + + @Override + public boolean isHardRef() + { + return false; + } + } + + final class StoredBDBMessage implements StoredMessage + { + + private final long _messageId; + + private volatile MessageDataRef _messageDataRef; StoredBDBMessage(long messageId, T metaData) { @@ -1061,27 +1218,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore StoredBDBMessage(long messageId, T metaData, boolean isRecovered) { _messageId = messageId; - _isRecovered = isRecovered; - if(!_isRecovered) + if(!isRecovered) { - _metaData = metaData; + _messageDataRef = new MessageDataHardRef<>(metaData); + } + else + { + _messageDataRef = new MessageDataSoftRef<>(metaData, null); } - _metaDataRef = new SoftReference(metaData); } @Override public T getMetaData() { - T metaData = _metaDataRef.get(); + T metaData = _messageDataRef.getMetaData(); + if(metaData == null) { checkMessageStoreOpen(); - metaData = (T) getMessageMetaData(_messageId); - _metaDataRef = new SoftReference(metaData); + _messageDataRef = new MessageDataSoftRef<>(metaData,null); } - return metaData; } @@ -1095,21 +1253,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); - - if(_data == null) + byte[] data = _messageDataRef.getData(); + if(data == null) { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference(_data); - src.duplicate().get(_data); + data = new byte[src.remaining()]; + src.duplicate().get(data); + _messageDataRef.setData(data); } else { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference(_data); + byte[] oldData = data; + data = new byte[oldData.length + src.remaining()]; + + + System.arraycopy(oldData, 0, data, 0, oldData.length); + src.duplicate().get(data, oldData.length, src.remaining()); - System.arraycopy(oldData, 0, _data, 0, oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); + _messageDataRef.setData(data); } } @@ -1117,55 +1277,116 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override public int getContent(int offsetInMessage, ByteBuffer dst) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) + byte[] data = _messageDataRef.getData(); + if(data == null) { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; + if(stored()) + { + checkMessageStoreOpen(); + data = AbstractBDBMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + metaData = (T) getMessageMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } - else - { - checkMessageStoreOpen(); - return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; } @Override public ByteBuffer getContent(int offsetInMessage, int size) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else + byte[] data = _messageDataRef.getData(); + if(data == null) { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; + if(stored()) + { + checkMessageStoreOpen(); + data = AbstractBDBMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + metaData = (T) getMessageMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } + return ByteBuffer.wrap(data,offsetInMessage,size); + } - synchronized void store(Transaction txn) + synchronized Runnable store(Transaction txn) { if (!stored()) { - try + + AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData()); + AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, + _messageDataRef.getData() == null + ? ByteBuffer.allocate(0) + : ByteBuffer.wrap(_messageDataRef.getData())); + + + MessageDataRef hardRef = _messageDataRef; + MessageDataSoftRef messageDataSoftRef; + MessageData ref; + do { - _dataRef = new SoftReference(_data); - AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData()); + ref = messageDataSoftRef.get(); } - finally + while (ref == null); + + _messageDataRef = messageDataSoftRef; + + class Pointer implements Runnable { - _metaData = null; - _data = null; + private MessageData _ref; + + Pointer(final MessageData ref) + { + _ref = ref; + } + + @Override + public void run() + { + _ref = null; + } } + return new Pointer(ref); + } + else + { + return new Runnable() + { + + @Override + public void run() + { + } + }; } } @@ -1205,7 +1426,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private boolean stored() { - return _metaData == null || _isRecovered; + return !_messageDataRef.isHardRef(); } @Override @@ -1220,7 +1441,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; - private final List _onCommitActions = new ArrayList<>(); + private final List _preCommitActions = new ArrayList<>(); + private final List _postCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1242,13 +1464,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - _onCommitActions.add(new Runnable() + final long contentSize = storedMessage.getMetaData().getContentSize(); + _preCommitActions.add(new Runnable() { @Override public void run() { - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _postCommitActions.add(storedMessage.store(_txn)); + _storeSizeIncrease += contentSize; } }); @@ -1271,16 +1494,26 @@ public abstract class AbstractBDBMessageStore implements MessageStore checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + doPostCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } private void doPreCommitActions() { - for(Runnable action : _onCommitActions) + for(Runnable action : _preCommitActions) + { + action.run(); + } + _preCommitActions.clear(); + } + + private void doPostCommitActions() + { + for(Runnable action : _postCommitActions) { action.run(); } - _onCommitActions.clear(); + _postCommitActions.clear(); } @Override @@ -1289,14 +1522,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + doPostCommitActions(); + return storeFuture; } @Override public void abortTran() throws StoreException { checkMessageStoreOpen(); - _onCommitActions.clear(); + _preCommitActions.clear(); + _postCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } @@ -1314,7 +1550,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues)); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index ab0d1ab9ef..4c3b72a29b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import com.sleepycat.je.config.ConfigParam; import com.sleepycat.je.config.EnvironmentParams; @@ -65,16 +66,15 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor private Map buildEnvironmentConfiguration(ConfiguredObject parent) { - final Map context = parent.getContext(); Map envConfigMap = new HashMap<>(); for (ConfigParam cp : EnvironmentParams.SUPPORTED_PARAMS.values()) { final String parameterName = cp.getName(); - if (context.containsKey(parameterName) && !cp.isForReplication()) + Set contextKeys = parent.getContextKeys(); + if (!cp.isForReplication() && contextKeys.contains(parameterName)) { - String contextValue = context.get(parameterName); - envConfigMap.put(parameterName, contextValue); + envConfigMap.put(parameterName, parent.getContextValue(String.class, parameterName)); } } return Collections.unmodifiableMap(envConfigMap); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 4f21baf42f..37d53319b5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -126,11 +126,11 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact private Map buildConfig(ConfiguredObject parent, Pattern paramName) { Map targetMap = new HashMap<>(); - for (String name : parent.getContext().keySet()) + for (String name : parent.getContextKeys()) { if (paramName.matcher(name).matches()) { - String contextValue = parent.getContext().get(name); + String contextValue = parent.getContextValue(String.class,name); targetMap.put(name, contextValue); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java index cc0032845f..12511ad9e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.virtualhost.berkeleydb; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.SizeMonitoringSettings; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -31,6 +32,12 @@ public interface BDBVirtualHost> extends VirtualHost String STORE_PATH = "storePath"; + // Default the JE cache to 5% of total memory, but no less than 10Mb and no more than 200Mb + @ManagedContextDefault(name="je.maxMemory") + long DEFAULT_JE_CACHE_SIZE = Math.max(10l*1024l*1024l, + Math.min(200l*1024l*1024l, + Runtime.getRuntime().maxMemory()/20l)); + @ManagedAttribute(mandatory = true) String getStorePath(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 388e2e7608..061fa12768 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -418,7 +418,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode contextMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); + when(parent.getContext()).thenReturn(contextMap); + when(parent.getContextKeys()).thenReturn(contextMap.keySet()); + when(parent.getContextValue(eq(String.class),eq("je.log.fileMax"))).thenReturn(MAX_BDB_LOG_SIZE); when(parent.getStorePath()).thenReturn(storeLocation); when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE); when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 556cfcb266..198c0a1cb9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.server.message; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - public abstract class AbstractServerMessageImpl, T extends StorableMessageMetaData> implements ServerMessage { private static final AtomicIntegerFieldUpdater _refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + private volatile int _referenceCount = 0; private final StoredMessage _handle; private final Object _connectionReference; @@ -113,7 +113,7 @@ public abstract class AbstractServerMessageImpl newReference() { - return new Reference(); + return new Reference(this); } @Override @@ -148,26 +148,32 @@ public abstract class AbstractServerMessageImpl + private static class Reference, T extends StorableMessageMetaData> + implements MessageReference { - private final AtomicBoolean _released = new AtomicBoolean(false); + private static final AtomicIntegerFieldUpdater _releasedUpdater = + AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released"); + + private AbstractServerMessageImpl _message; + private volatile int _released; - private Reference() + private Reference(final AbstractServerMessageImpl message) { - incrementReference(); + _message = message; + _message.incrementReference(); } public X getMessage() { - return (X) AbstractServerMessageImpl.this; + return (X) _message; } - public void release() + public synchronized void release() { - if(!_released.getAndSet(true)) + if(_releasedUpdater.compareAndSet(this,0,1)) { - decrementReference(); + _message.decrementReference(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java index 1d7b8627f4..af9a252077 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java @@ -92,5 +92,7 @@ public interface InstanceProperties return _props.get(prop); } } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 83e2cc06ec..49d3fea8fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -1339,6 +1339,14 @@ public abstract class AbstractConfiguredObject> im return converter.convert("${" + propertyName + "}", this); } + @Override + public Set getContextKeys() + { + Map inheritedContext = new HashMap<>(); + generateInheritedContext(getModel(), this, inheritedContext); + return Collections.unmodifiableSet(inheritedContext.keySet()); + } + private OwnAttributeResolver getOwnAttributeResolver() { return _attributeResolver; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 2301f23773..01ca2fa646 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.model; import java.security.AccessControlException; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -78,6 +79,8 @@ public interface ConfiguredObject> T getContextValue(Class clazz, String propertyName); + Set getContextKeys(); + @DerivedAttribute( persist = true ) String getLastUpdatedBy(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6fa7801608..e6cde6c934 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.util.EnumMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -80,13 +79,17 @@ public abstract class QueueEntryImpl implements QueueEntry private volatile long _entryId; - private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties(); + private static int REDELIVERED_FLAG = 1; + private static int PERSISTENT_FLAG = 2; + private static int MANDATORY_FLAG = 4; + private static int IMMEDIATE_FLAG = 8; + private int _flags; + private long _expiration; /** Number of times this message has been delivered */ - private volatile int _deliveryCount = 0; + private volatile int _deliveryCount = -1; private static final AtomicIntegerFieldUpdater _deliveryCountUpdater = AtomicIntegerFieldUpdater .newUpdater(QueueEntryImpl.class, "_deliveryCount"); - private boolean _deliveredToConsumer; public QueueEntryImpl(QueueEntryList queueEntryList) @@ -117,14 +120,17 @@ public abstract class QueueEntryImpl implements QueueEntry { if(_message != null) { - _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent()); - _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration()); + if(_message.getMessage().isPersistent()) + { + setPersistent(); + } + _expiration = _message.getMessage().getExpiration(); } } public InstanceProperties getInstanceProperties() { - return _instanceProperties; + return new EntryInstanceProperties(); } protected void setEntryId(long entryId) @@ -154,21 +160,17 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean getDeliveredToConsumer() { - return _deliveredToConsumer; + return _deliveryCountUpdater.get(this) != -1; } public boolean expired() { - ServerMessage message = getMessage(); - if(message != null) + long expiration = _expiration; + if (expiration != 0L) { - long expiration = message.getExpiration(); - if (expiration != 0L) - { - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); - return (now > expiration); - } + return (now > expiration); } return false; @@ -206,7 +208,7 @@ public abstract class QueueEntryImpl implements QueueEntry final boolean acquired = acquire(((QueueConsumer)sub).getOwningState()); if(acquired) { - _deliveredToConsumer = true; + _deliveryCountUpdater.compareAndSet(this,-1,0); } return acquired; } @@ -253,15 +255,6 @@ public abstract class QueueEntryImpl implements QueueEntry } - public void setRedelivered() - { - _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE); - } - - public boolean isRedelivered() - { - return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED)); - } public QueueConsumer getDeliveredConsumer() { @@ -459,7 +452,7 @@ public abstract class QueueEntryImpl implements QueueEntry public int getDeliveryCount() { - return _deliveryCount; + return _deliveryCount == -1 ? 0 : _deliveryCount; } @Override @@ -470,6 +463,7 @@ public abstract class QueueEntryImpl implements QueueEntry public void incrementDeliveryCount() { + _deliveryCountUpdater.compareAndSet(this,-1,0); _deliveryCountUpdater.incrementAndGet(this); } @@ -509,20 +503,45 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue(); } - private static class EntryInstanceProperties implements InstanceProperties + public void setRedelivered() + { + _flags |= REDELIVERED_FLAG; + } + + private void setPersistent() + { + _flags |= PERSISTENT_FLAG; + } + + public boolean isRedelivered() + { + return (_flags & REDELIVERED_FLAG) != 0; + } + + private class EntryInstanceProperties implements InstanceProperties { - private final EnumMap _properties = new EnumMap(Property.class); @Override public Object getProperty(final Property prop) { - return _properties.get(prop); - } + switch(prop) + { - private void setProperty(Property prop, Object value) - { - _properties.put(prop, value); + case REDELIVERED: + return (_flags & REDELIVERED_FLAG) != 0; + case PERSISTENT: + return (_flags & PERSISTENT_FLAG) != 0; + case MANDATORY: + return (_flags & MANDATORY_FLAG) != 0; + case IMMEDIATE: + return (_flags & IMMEDIATE_FLAG) != 0; + case EXPIRATION: + return _expiration; + default: + throw new IllegalArgumentException("Unknown property " + prop); + } } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index f2f85e1387..1a1085339d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -716,8 +716,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } - private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, - Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException + private List recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, + Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException { Connection conn = connWrapper.getConnection(); @@ -738,6 +738,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore stmt.close(); } + List postActions = new ArrayList<>(); + for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + { + StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); + if(storedMessage instanceof StoredJDBCMessage) + { + postActions.add(((StoredJDBCMessage) storedMessage).store(conn)); + } + } + + stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); try @@ -773,7 +784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { stmt.close(); } - + return postActions; } catch (SQLException e) { @@ -1105,6 +1116,47 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } + + private byte[] getAllContent(long messageId) + { + Connection conn = null; + PreparedStatement stmt = null; + + try + { + conn = newAutoCommitConnection(); + + stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + ResultSet rs = stmt.executeQuery(); + + int written = 0; + + if (rs.next()) + { + + byte[] dataAsBytes = getBlobAsBytes(rs, 1); + return dataAsBytes; + } + + throw new StoreException("No such message, id: " + messageId); + + } + catch (SQLException e) + { + throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closeConnection(conn, getLogger()); + } + + + } + + + @Override public boolean isPersistent() { @@ -1116,7 +1168,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; - private final List _onCommitActions = new ArrayList<>(); + private final List _preCommitActions = new ArrayList<>(); + private final List _postCommitActions = new ArrayList<>(); protected JDBCTransaction() { @@ -1138,19 +1191,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - _onCommitActions.add(new Runnable() + _preCommitActions.add(new Runnable() { @Override public void run() { try { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _postCommitActions.add(((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection())); _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); } catch (SQLException e) { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + throw new StoreException("Exception on enqueuing message into message store" + _messageId, + e); } } }); @@ -1174,6 +1228,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore doPreCommitActions(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); + doPostCommitActions(); } @Override @@ -1183,23 +1238,33 @@ public abstract class AbstractJDBCMessageStore implements MessageStore doPreCommitActions(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); + doPostCommitActions(); return storeFuture; } private void doPreCommitActions() { - for(Runnable action : _onCommitActions) + for(Runnable action : _preCommitActions) + { + action.run(); + } + _preCommitActions.clear(); + } + + private void doPostCommitActions() + { + for(Runnable action : _postCommitActions) { action.run(); } - _onCommitActions.clear(); + _postCommitActions.clear(); } @Override public void abortTran() { checkMessageStoreOpen(); - _onCommitActions.clear(); + _preCommitActions.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @@ -1216,56 +1281,171 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); - AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues); + _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues)); } } - private class StoredJDBCMessage implements StoredMessage + + static interface MessageDataRef { + T getMetaData(); + byte[] getData(); + void setData(byte[] data); + boolean isHardRef(); + } - private final long _messageId; - private final boolean _isRecovered; - private StorableMessageMetaData _metaData; - private volatile SoftReference _metaDataRef; + private static final class MessageDataHardRef implements MessageDataRef + { + private final T _metaData; private byte[] _data; - private volatile SoftReference _dataRef; + + private MessageDataHardRef(final T metaData) + { + _metaData = metaData; + } + + @Override + public T getMetaData() + { + return _metaData; + } + + @Override + public byte[] getData() + { + return _data; + } + + @Override + public void setData(final byte[] data) + { + _data = data; + } + + @Override + public boolean isHardRef() + { + return true; + } + } + + private static final class MessageData + { + private T _metaData; + private SoftReference _data; + + private MessageData(final T metaData, final byte[] data) + { + _metaData = metaData; + + if(data != null) + { + _data = new SoftReference<>(data); + } + } + + public T getMetaData() + { + return _metaData; + } + + public byte[] getData() + { + return _data == null ? null : _data.get(); + } + + public void setData(final byte[] data) + { + _data = new SoftReference<>(data); + } + + + } + private static final class MessageDataSoftRef extends SoftReference> implements MessageDataRef + { + + public MessageDataSoftRef(final T metadata, byte[] data) + { + super(new MessageData(metadata, data)); + } + + @Override + public T getMetaData() + { + MessageData ref = get(); + return ref == null ? null : ref.getMetaData(); + } + + @Override + public byte[] getData() + { + MessageData ref = get(); + + return ref == null ? null : ref.getData(); + } + + @Override + public void setData(final byte[] data) + { + MessageData ref = get(); + if(ref != null) + { + ref.setData(data); + } + } + + @Override + public boolean isHardRef() + { + return false; + } + } + + private class StoredJDBCMessage implements StoredMessage + { + + private final long _messageId; + + private volatile MessageDataRef _messageDataRef; - StoredJDBCMessage(long messageId, StorableMessageMetaData metaData) + StoredJDBCMessage(long messageId, T metaData) { this(messageId, metaData, false); } StoredJDBCMessage(long messageId, - StorableMessageMetaData metaData, boolean isRecovered) + T metaData, boolean isRecovered) { _messageId = messageId; - _isRecovered = isRecovered; - if(!_isRecovered) + if(!isRecovered) { - _metaData = metaData; + _messageDataRef = new MessageDataHardRef<>(metaData); + } + else + { + _messageDataRef = new MessageDataSoftRef<>(metaData, null); } - _metaDataRef = new SoftReference(metaData); } @Override - public StorableMessageMetaData getMetaData() + public T getMetaData() { - StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData; + T metaData = _messageDataRef.getMetaData(); if(metaData == null) { checkMessageStoreOpen(); try { - metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId); + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData,null); } catch (SQLException e) { throw new StoreException(e); } - _metaDataRef = new SoftReference(metaData); } return metaData; @@ -1281,21 +1461,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); + byte[] data = _messageDataRef.getData(); - if(_data == null) + if(data == null) { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference(_data); - src.duplicate().get(_data); + data = new byte[src.remaining()]; + src.duplicate().get(data); + _messageDataRef.setData(data); } else { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference(_data); + byte[] oldData = data; + data = new byte[oldData.length + src.remaining()]; + + System.arraycopy(oldData,0,data,0,oldData.length); + src.duplicate().get(data, oldData.length, src.remaining()); - System.arraycopy(oldData,0,_data,0,oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); + _messageDataRef.setData(data); } } @@ -1303,34 +1485,90 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public int getContent(int offsetInMessage, ByteBuffer dst) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else + byte[] data = _messageDataRef.getData(); + + if(data == null) { - checkMessageStoreOpen(); - return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst); + if(stored()) + { + checkMessageStoreOpen(); + getLogger().debug("GET CONTENT for message id " + _messageId); + data = AbstractJDBCMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + try + { + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + catch (SQLException e) + { + throw new StoreException(e); + } + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } + + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } @Override public ByteBuffer getContent(int offsetInMessage, int size) { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.position(0); - buf.limit(length); - return buf; + byte[] data = _messageDataRef.getData(); + + if(data == null) + { + + if(stored()) + { + checkMessageStoreOpen(); + + data = AbstractJDBCMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + try + { + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + catch (SQLException e) + { + throw new StoreException(e); + } + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } + } + return ByteBuffer.wrap(data,offsetInMessage,size); + } @Override public void remove() { + getLogger().debug("REMOVE called on message: " + _messageId); checkMessageStoreOpen(); int delta = getMetaData().getContentSize(); @@ -1338,32 +1576,69 @@ public abstract class AbstractJDBCMessageStore implements MessageStore storedSizeChange(-delta); } - private synchronized void store(final Connection conn) throws SQLException + private synchronized Runnable store(final Connection conn) throws SQLException { if (!stored()) { - try + getLogger().debug("STORING message id " + _messageId); + storeMetaData(conn, _messageId, _messageDataRef.getMetaData()); + AbstractJDBCMessageStore.this.addContent(conn, _messageId, + _messageDataRef.getData() == null + ? ByteBuffer.allocate(0) + : ByteBuffer.wrap(_messageDataRef.getData())); + + + if(getLogger().isDebugEnabled()) { - storeMetaData(conn, _messageId, _metaData); - AbstractJDBCMessageStore.this.addContent(conn, _messageId, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + getLogger().debug("Storing message " + _messageId + " to store"); } - finally + + MessageDataRef hardRef = _messageDataRef; + MessageDataSoftRef messageDataSoftRef; + MessageData ref; + do { - _metaData = null; - _data = null; + messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData()); + ref = messageDataSoftRef.get(); } + while (ref == null); - if(getLogger().isDebugEnabled()) + _messageDataRef = messageDataSoftRef; + + class Pointer implements Runnable { - getLogger().debug("Storing message " + _messageId + " to store"); + private MessageData _ref; + + Pointer(final MessageData ref) + { + getLogger().debug("POST COMMIT for message id " + _messageId); + _ref = ref; + } + + @Override + public void run() + { + _ref = null; + } } + return new Pointer(ref); + } + else + { + return new Runnable() + { + + @Override + public void run() + { + } + }; } } private boolean stored() { - return _metaData == null || _isRecovered; + return !_messageDataRef.isHardRef(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index fbe402c4a1..df9ffda3a3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -90,7 +90,7 @@ public abstract class AbstractStandardVirtualHostNode> final BasicContentHeaderProperties properties = incomingMessage.getContentHeader().getProperties(); - long expiration = properties.getExpiration(); - message.setExpiration(expiration); return message; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index 0ed63daf7c..869de2f3a5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -29,8 +29,6 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; -import java.nio.ByteBuffer; - /** * A deliverable message. */ @@ -39,10 +37,6 @@ public class AMQMessage extends AbstractServerMessageImpl handle) @@ -56,11 +50,6 @@ public class AMQMessage extends AbstractServerMessageImpl providerAttributes = new HashMap(_parent.getContext()); - providerAttributes.keySet().retainAll(connectionProviderFactory.getProviderAttributeNames()); + Map providerAttributes = new HashMap<>(); + Set providerAttributeNames = connectionProviderFactory.getProviderAttributeNames(); + providerAttributeNames.retainAll(parent.getContextKeys()); + for(String attr : providerAttributeNames) + { + providerAttributes.put(attr, parent.getContextValue(String.class, attr)); + } _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, settings.getUsername(), diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java index 4fde0a44c7..3304d01d86 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java @@ -28,6 +28,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.Set; + +import javax.security.auth.Subject; import org.apache.log4j.Logger; @@ -36,8 +39,6 @@ import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.StoreException; -import javax.security.auth.Subject; - /** * Implementation of a MessageStore backed by a Generic JDBC Database. */ @@ -60,7 +61,7 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore JDBCSettings settings = (JDBCSettings)parent; _connectionURL = settings.getConnectionUrl(); - JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent.getContext()); + JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent); if (!details.isKnownVendor() && getLogger().isInfoEnabled()) { @@ -90,9 +91,13 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore try { - Map providerAttributes = new HashMap(parent.getContext()); - providerAttributes.keySet().retainAll(connectionProviderFactory.getProviderAttributeNames()); - + Map providerAttributes = new HashMap<>(); + Set providerAttributeNames = connectionProviderFactory.getProviderAttributeNames(); + providerAttributeNames.retainAll(parent.getContextKeys()); + for(String attr : providerAttributeNames) + { + providerAttributes.put(attr, parent.getContextValue(String.class, attr)); + } _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, settings.getUsername(), diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java index a74f852dfa..8cd4996033 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java @@ -19,9 +19,15 @@ package org.apache.qpid.server.store.jdbc; +import java.util.AbstractMap; +import java.util.AbstractSet; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Set; + +import org.apache.qpid.server.model.ConfiguredObject; public abstract class JDBCDetails { @@ -216,7 +222,75 @@ public abstract class JDBCDetails return result; } - + public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final ConfiguredObject object) + { + final Set contextKeys = object.getContextKeys(); + Map mapConversion = new AbstractMap() + { + @Override + public Set> entrySet() + { + return new AbstractSet>() + { + @Override + public Iterator> iterator() + { + final Iterator underlying = contextKeys.iterator(); + return new Iterator>() + { + @Override + public boolean hasNext() + { + return underlying.hasNext(); + } + + @Override + public Entry next() + { + final String key = underlying.next(); + final String value = object.getContextValue(String.class, key); + return new Entry() + { + + @Override + public String getKey() + { + return key; + } + + @Override + public String getValue() + { + return value; + } + + @Override + public String setValue(final String value) + { + throw new UnsupportedOperationException(); + } + }; + + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public int size() + { + return contextKeys.size(); + } + }; + } + }; + return getDetailsForJdbcUrl(jdbcUrl, mapConversion); + } public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final Map contextMap) { String[] components = jdbcUrl.split(":", 3); -- cgit v1.2.1