From 7b08633eec54ecd50855cf07b34267b1630379b6 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 16:17:16 +0000 Subject: QPID-6378 : Applying patch from Xin Chen git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658750 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java | 2 +- .../src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 69b4939070..cd31974e7f 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -179,7 +179,7 @@ public class Connection implements ExceptionHandler boolean ssl, int channelMax) throws ConnectionException { - this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container, + this(ssl?"amqps":"amqp",address,port,username,password,maxFrameSize,container, remoteHostname, getSslContext(ssl), null, diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java index 3e9dca683e..38f28667d6 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java @@ -225,8 +225,8 @@ public class FrameHandler implements ProtocolHandler // PARSE HERE try { - Object val = _typeHandler.parse(in); - + Object val = in.hasRemaining() ? _typeHandler.parse(in) : null; + if(in.hasRemaining()) { if(val instanceof Transfer) -- cgit v1.2.1 From e4d9cbe7b63b862914696d605303c5401049bb23 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 16:23:27 +0000 Subject: QPID-6352 : Always check TCP transports without looking for service loader git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658752 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index cd31974e7f..a4f9ac5a3a 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -291,6 +291,12 @@ public class Connection implements ExceptionHandler private TransportProvider getTransportProvider(final String protocol) throws ConnectionException { + TCPTransportProviderFactory tcpTransportProviderFactory = new TCPTransportProviderFactory(); + if(tcpTransportProviderFactory.getSupportedTransports().contains(protocol)) + { + return tcpTransportProviderFactory.getProvider(protocol); + } + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ServiceLoader providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader); -- cgit v1.2.1 From 77208f71328e3a62ab970401eae083980e94ab44 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 23:52:08 +0000 Subject: QPID-6380 : close()ing a durable subscription should detach rather than close a link git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658843 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index b89025a27b..4b53cfa795 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -131,6 +131,13 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub protected void closeUnderlyingReceiver(Receiver receiver) { - receiver.close(); + if(isDurable()) + { + receiver.detach(); + } + else + { + receiver.close(); + } } } -- cgit v1.2.1 From 93024d74d2711c3c3cdab6e98f7158ca730abbe1 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 10 Feb 2015 23:57:54 +0000 Subject: QPID-6381 : if detach with close=true is received, then actually destroy the link git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658845 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 123d6ac2fb..cdaf5f0ed6 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -408,7 +408,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { //TODO // if not durable or close - if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) + if(Boolean.TRUE.equals(detach.getClosed()) || !TerminusDurability.UNSETTLED_STATE.equals(_durability)) { while(!_consumer.trySendLock()) { -- cgit v1.2.1 From 35f8db0065335d4da24de4459cf228b077218138 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 01:04:08 +0000 Subject: QPID-6384 : fix various issues with durable links git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658849 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/amqp_1_0/transport/LinkEndpoint.java | 21 +++++++++++++++++---- .../qpid/amqp_1_0/transport/SessionEndpoint.java | 16 ++++++++++++---- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 8 +++++++- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 3 ++- 4 files changed, 38 insertions(+), 10 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index 434f939a21..246d43d3de 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -21,15 +21,28 @@ package org.apache.qpid.amqp_1_0.transport; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; - import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.transport.Attach; +import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.amqp_1_0.type.transport.Flow; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Role; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + public abstract class LinkEndpoint { diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 0f37518773..5a28ddcb60 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -98,6 +98,12 @@ public class SessionEndpoint private int _availableOutgoingCredit; private UnsignedInteger _lastSentIncomingLimit; + private final Error _sessionEndedLinkError = + new Error(LinkError.DETACH_FORCED, + "Force detach the link because the session is remotely ended."); + + + public SessionEndpoint(final ConnectionEndpoint connectionEndpoint) { this(connectionEndpoint, UnsignedInteger.valueOf(0)); @@ -240,19 +246,21 @@ public class SessionEndpoint private void detachLinks() { Collection handles = new ArrayList(_remoteLinkEndpoints.keySet()); - Error error = new Error(); - error.setCondition(LinkError.DETACH_FORCED); - error.setDescription("Force detach the link because the session is remotely ended."); for(UnsignedInteger handle : handles) { Detach detach = new Detach(); detach.setClosed(false); detach.setHandle(handle); - detach.setError(error); + detach.setError(_sessionEndedLinkError); detach(handle, detach); } } + public boolean isSyntheticError(Error error) + { + return error == _sessionEndedLinkError; + } + public short getSendingChannel() { return _sendingChannel; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 598fce03b9..f19ce6b1be 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -283,7 +284,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { //TODO getEndpoint().setSource(null); - getEndpoint().detach(); + getEndpoint().close(); + + final LinkRegistry linkReg = getSession().getConnection() + .getVirtualHost() + .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId()); + linkReg.unregisterSendingLink(getEndpoint().getName()); } public boolean allocateCredit(final ServerMessage msg) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index cdaf5f0ed6..e3994005d6 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -464,7 +464,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _consumer.releaseSendLock(); } } - else if(detach == null || detach.getError() != null) + else if(detach.getError() != null + && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) { _linkAttachment = null; _target.flowStateChanged(); -- cgit v1.2.1 From 83393913d1a7c9bced3c6681423b55fe1d6717e9 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 11:06:23 +0000 Subject: QPID-6383 : don't auto delete queues because the children are being removed due to broker closure git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658923 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/queue/AbstractQueue.java | 14 +++++++++++++- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 16 +++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f905558f13..8f4c3d6df0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -245,6 +245,7 @@ public abstract class AbstractQueue> private final ConcurrentLinkedQueue _postRecoveryQueue = new ConcurrentLinkedQueue<>(); private final QueueRunner _queueRunner = new QueueRunner(this); + private boolean _closing; protected AbstractQueue(Map attributes, VirtualHostImpl virtualHost) { @@ -753,6 +754,15 @@ public abstract class AbstractQueue> } + @Override + protected void beforeClose() + { + _closing = true; + super.beforeClose(); + } + + + synchronized void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) @@ -793,7 +803,8 @@ public abstract class AbstractQueue> if(!consumer.isTransient() && ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS ) - && getConsumerCount() == 0) + && getConsumerCount() == 0 + && !(consumer.isDurable() && _closing)) { if (_logger.isInfoEnabled()) @@ -1797,6 +1808,7 @@ public abstract class AbstractQueue> { ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } + _closing = false; } public void checkCapacity(AMQSessionModel channel) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index e3994005d6..d1d1227818 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -292,15 +292,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS actualFilters.put(entry.getKey(), entry.getValue()); } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) + catch (ParseException | SelectorParsingException e) { Error error = new Error(); error.setCondition(AmqpError.INVALID_FIELD); @@ -364,6 +356,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS options.add(ConsumerImpl.Option.NO_LOCAL); } + if(_durability == TerminusDurability.CONFIGURATION || + _durability == TerminusDurability.UNSETTLED_STATE ) + { + options.add(ConsumerImpl.Option.DURABLE); + } + try { final String name; -- cgit v1.2.1 From bcfed28760d8fe968c71de9752296b9a5f34d2c2 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 11:20:33 +0000 Subject: QPID-6383 : Check in missing file from last commit git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658927 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index c85e4058a1..6b02a84e83 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -153,7 +153,7 @@ class QueueConsumerImpl attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE)); attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL)); attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY"); - attributes.put(DURABLE,false); + attributes.put(DURABLE,optionSet.contains(Option.DURABLE)); attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END); if(filters != null) { -- cgit v1.2.1 From cba338185d3c3f9bdc2e0b490df20d07ffade454 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 12:04:30 +0000 Subject: QPID-6383 : Add missing file - really this time git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658941 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index b15b01ede5..c0db72d498 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -37,7 +37,8 @@ public interface ConsumerImpl SEES_REQUEUES, TRANSIENT, EXCLUSIVE, - NO_LOCAL + NO_LOCAL, + DURABLE } long getBytesOut(); -- cgit v1.2.1 From e31d29d9127d2861b818978e324104d2cca64133 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 18:26:39 +0000 Subject: QPID-6381 : don't delete link endpoints which are detached and have terminus durability CONFIGURATION git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659037 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index d1d1227818..85a0b559c9 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -406,7 +406,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { //TODO // if not durable or close - if(Boolean.TRUE.equals(detach.getClosed()) || !TerminusDurability.UNSETTLED_STATE.equals(_durability)) + if(Boolean.TRUE.equals(detach.getClosed()) + || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability))) { while(!_consumer.trySendLock()) { -- cgit v1.2.1 From 647d21b7ce2bae2174a22019c6cc2e2fd0a2e13d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 18:48:22 +0000 Subject: QPID-6240 : Rollback of transactions with settled acks should release the queue entries git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659039 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f19ce6b1be..829b3bf336 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -426,7 +426,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); - _queueEntry.unlockAcquisition(); + _queueEntry.release(); } } }); -- cgit v1.2.1 From 08f5f85f8e306c4dc20e75d976270c59753f54a4 Mon Sep 17 00:00:00 2001 From: Justin Ross Date: Wed, 11 Feb 2015 20:43:53 +0000 Subject: QPID-6347: Remove the now obsolete queue_event_generation option; this is a patch from Irina Boverman git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659063 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/messaging/address/QpidQueueOptions.java | 13 ------------- 1 file changed, 13 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java index 5b6c027f4a..24295a0832 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java @@ -30,7 +30,6 @@ public class QpidQueueOptions extends HashMap public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse"; - public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"; public void validatePolicyType(String type) { @@ -83,16 +82,4 @@ public class QpidQueueOptions extends HashMap this.put(QPID_LVQ_KEY, key); } - public void setQueueEvents(String value) - { - if (value != null && (value.equals("1") || value.equals("2"))) - { - this.put(QPID_QUEUE_EVENT_GENERATION, value); - } - else - { - throw new IllegalArgumentException("Invalid value for " + - QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}"); - } - } } -- cgit v1.2.1 From 90fcef0d551f0defd22a60b447446856cc39e750 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 11 Feb 2015 22:27:52 +0000 Subject: QPID-6387: [Java Client] Remove array optimisation from session/consumer maps git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659103 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 92 ++------------------- .../org/apache/qpid/client/AMQSession_0_10.java | 6 +- .../org/apache/qpid/client/AMQSession_0_8.java | 7 +- .../apache/qpid/client/ChannelToSessionMap.java | 93 +++------------------- .../org/apache/qpid/client/XAConnectionImpl.java | 2 +- 5 files changed, 27 insertions(+), 173 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 12e9285af8..86e1bb0a8b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -133,7 +133,7 @@ public abstract class AMQSession _producers = new ConcurrentHashMap(); + private final Map _producers = new ConcurrentHashMap(); /** * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume @@ -195,7 +195,7 @@ public abstract class AMQSession _consumers = new IdToConsumerMap(); + private final Map _consumers = new ConcurrentHashMap<>(); /** * Contains a list of consumers which have been removed but which might still have @@ -294,12 +294,11 @@ public abstract class AMQSession getConsumers() + protected Collection getConsumers() { - return _consumers; + return new ArrayList(_consumers.values()); } protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) @@ -317,83 +316,6 @@ public abstract class AMQSession - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentMap _slowAccessConsumers = new ConcurrentHashMap(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection values() - { - ArrayList values = new ArrayList(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - /** * Creates a new session on a connection. * @@ -2490,7 +2412,7 @@ public abstract class AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 143de271a1..5fb9329af7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession consumersToCheck = new ArrayList(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; - for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + for(BasicMessageConsumer_0_8 consumer : getConsumers()) { if (consumer.isMessageListenerSet()) { @@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession _slowAccessSessions = new LinkedHashMap(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private final Map _sessionMap = new ConcurrentHashMap<>(); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; public AMQSession get(int channelId) { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } + return _sessionMap.get(channelId); } - public AMQSession put(int channelId, AMQSession session) + public void put(int channelId, AMQSession session) { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - + _sessionMap.put(channelId, session); } - public AMQSession remove(int channelId) + public void remove(int channelId) { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - + _sessionMap.remove(channelId); } public Collection values() { - ArrayList values = new ArrayList(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; + return new ArrayList<>(_sessionMap.values()); } public int size() { - return _size; + return _sessionMap.size(); } public void clear() { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } + _sessionMap.clear(); } /* @@ -141,14 +80,8 @@ public final class ChannelToSessionMap //go back to the start _idFactory.set(_minChannelID); } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } + + done = (!_sessionMap.keySet().contains(id)); } return id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index d9514338ce..d625a9ae69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -29,7 +29,7 @@ import javax.jms.XATopicConnection; import javax.jms.XATopicSession; /** - * This class implements the javax.njms.XAConnection interface + * This class implements the javax.jms.XAConnection interface */ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection { -- cgit v1.2.1 From a3c5d96d61fdaf76f4cf9dd4e3f543fb87ee95d6 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 12 Feb 2015 11:42:03 +0000 Subject: QPID-6240: increment the delivery count when applying the new state and releasing the queue entry git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659229 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java | 1 + 1 file changed, 1 insertion(+) (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 829b3bf336..3b9521866c 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -426,6 +426,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); + _queueEntry.incrementDeliveryCount(); _queueEntry.release(); } } -- cgit v1.2.1 From 822ef8b38a63e3b351b30c382bfc77de39904c77 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 12 Feb 2015 17:57:11 +0000 Subject: QPID-6374 : avoid taking a lock when not modifying a value git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659341 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 27 ++++++++++++---------- 1 file changed, 15 insertions(+), 12 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 86e1bb0a8b..8f5e9524b6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -224,7 +224,7 @@ public abstract class AMQSession Date: Thu, 12 Feb 2015 18:24:35 +0000 Subject: QPID-6386 : [AMQP 1.0 Common] close the sender if the TCP connection is terminated before connection.open has occurred git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659348 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java | 1 + 1 file changed, 1 insertion(+) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 766c9705a1..17f334153d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -457,6 +457,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour case AWAITING_OPEN: case CLOSE_SENT: _state = ConnectionState.CLOSED; + closeSender(); break; case OPEN: _state = ConnectionState.CLOSE_RECEIVED; -- cgit v1.2.1 From 8d84886a1324a42db1992a4d567487821894d691 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 12 Feb 2015 18:50:09 +0000 Subject: QPID-6388 : Treat terminus with durability of "configuration" as durable git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659359 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 1820de9d3a..b9ee0ad498 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -214,7 +214,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel Date: Fri, 13 Feb 2015 17:01:59 +0000 Subject: QPID-6374: [Java Broker] 0-10 Failover: the thread performing the failover prep now syncs the dispatch queue (avoids possibility of app level dead lock) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659605 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 47 +++++++++++++++++-- .../apache/qpid/client/AMQConnectionDelegate.java | 2 - .../qpid/client/AMQConnectionDelegate_0_10.java | 24 ++++++++-- .../qpid/client/AMQConnectionDelegate_8_0.java | 5 -- .../java/org/apache/qpid/client/AMQSession.java | 44 +++++++++++------- .../org/apache/qpid/client/AMQSession_0_10.java | 2 +- .../qpid/client/BasicMessageConsumer_0_10.java | 2 +- .../apache/qpid/client/ChannelToSessionMap.java | 7 ++- .../client/util/FlowControllingBlockingQueue.java | 53 ++++++++++++++++++---- 9 files changed, 140 insertions(+), 46 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4c596b88a0..8e7b5b90d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } - public void failoverPrep() - { - _delegate.failoverPrep(); - } - public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); @@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _messageCompressionThresholdSize; } + + void doWithAllLocks(Runnable r) + { + doWithAllLocks(r, _sessions.values()); + + } + + private void doWithAllLocks(final Runnable r, final List sessions) + { + if (!sessions.isEmpty()) + { + AMQSession session = sessions.remove(0); + + final Object dispatcherLock = session.getDispatcherLock(); + if (dispatcherLock != null) + { + synchronized (dispatcherLock) + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (getFailoverMutex()) + { + r.run(); + } + } + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 74ca1ed74f..c359fbcc84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -52,8 +52,6 @@ public interface AMQConnectionDelegate XASession createXASession(int ackMode) throws JMSException; - void failoverPrep(); - void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index fdeab7ae70..e22a341205 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec List sessions = new ArrayList(_conn.getSessions().values()); for (AMQSession s : sessions) { - s.failoverPrep(); + ((AMQSession_0_10)s).failoverPrep(); } } @@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.notifyFailoverRequired(); - synchronized (_conn.getFailoverMutex()) + final AtomicBoolean failoverDone = new AtomicBoolean(); + + _conn.doWithAllLocks(new Runnable() { + @Override + public void run() + { try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { - _conn.failoverPrep(); + failoverPrep(); _conn.resubscribeSessions(); _conn.fireFailoverComplete(); - return; + failoverDone.set(true); } } catch (Exception e) @@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.getProtocolHandler().getFailoverLatch().countDown(); _conn.getProtocolHandler().setFailoverLatch(null); } + + } + }); + + + if (failoverDone.get()) + { + return; } + } + _conn.setClosed(); final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 35582d92b7..ae83b6ab48 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void failoverPrep() - { - // do nothing - } - /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f5e9524b6..3966e75423 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public abstract class AMQSession _queue; private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -358,7 +358,7 @@ public abstract class AMQSession(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -423,7 +423,7 @@ public abstract class AMQSession(_prefetchHighMark, null); } // Add creation logging to tie in with the existing close logging @@ -1789,7 +1789,7 @@ public abstract class AMQSession messages = _queue.iterator(); if (_logger.isDebugEnabled()) { _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" @@ -3237,6 +3233,12 @@ public abstract class AMQSession _sessionMap = new ConcurrentHashMap<>(); + private final Map _sessionMap = Collections.synchronizedMap(new LinkedHashMap()); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; @@ -48,7 +51,7 @@ public final class ChannelToSessionMap _sessionMap.remove(channelId); } - public Collection values() + public List values() { return new ArrayList<>(_sessionMap.values()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index b194ac88de..df54b7066b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; *

* TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ -public class FlowControllingBlockingQueue +public class FlowControllingBlockingQueue { private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final Queue _queue = new ConcurrentLinkedQueue(); + private final Queue _queue = new ConcurrentLinkedQueue(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue } } - public Object take() throws InterruptedException + public T blockingPeek() throws InterruptedException + { + T o = _queue.peek(); + if (o == null) + { + synchronized (this) + { + while ((o = _queue.peek()) == null) + { + wait(); + } + } + } + return o; + } + + public T nonBlockingTake() throws InterruptedException + { + T o = _queue.poll(); + + if (o != null && !disableFlowControl && _listener != null) + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + + } + + return o; + } + + public T take() throws InterruptedException { - Object o = _queue.poll(); + T o = _queue.poll(); if(o == null) { synchronized(this) @@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue return o; } - public void add(Object o) + public void add(T o) { synchronized(this) { @@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue } } - public Iterator iterator() + public Iterator iterator() { return _queue.iterator(); } -- cgit v1.2.1