From 08f5f85f8e306c4dc20e75d976270c59753f54a4 Mon Sep 17 00:00:00 2001 From: Justin Ross Date: Wed, 11 Feb 2015 20:43:53 +0000 Subject: QPID-6347: Remove the now obsolete queue_event_generation option; this is a patch from Irina Boverman git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659063 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/messaging/address/QpidQueueOptions.java | 13 ------------- 1 file changed, 13 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java index 5b6c027f4a..24295a0832 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java @@ -30,7 +30,6 @@ public class QpidQueueOptions extends HashMap public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse"; - public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"; public void validatePolicyType(String type) { @@ -83,16 +82,4 @@ public class QpidQueueOptions extends HashMap this.put(QPID_LVQ_KEY, key); } - public void setQueueEvents(String value) - { - if (value != null && (value.equals("1") || value.equals("2"))) - { - this.put(QPID_QUEUE_EVENT_GENERATION, value); - } - else - { - throw new IllegalArgumentException("Invalid value for " + - QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}"); - } - } } -- cgit v1.2.1 From 90fcef0d551f0defd22a60b447446856cc39e750 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 11 Feb 2015 22:27:52 +0000 Subject: QPID-6387: [Java Client] Remove array optimisation from session/consumer maps git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659103 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 92 ++------------------- .../org/apache/qpid/client/AMQSession_0_10.java | 6 +- .../org/apache/qpid/client/AMQSession_0_8.java | 7 +- .../apache/qpid/client/ChannelToSessionMap.java | 93 +++------------------- .../org/apache/qpid/client/XAConnectionImpl.java | 2 +- 5 files changed, 27 insertions(+), 173 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 12e9285af8..86e1bb0a8b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -133,7 +133,7 @@ public abstract class AMQSession _producers = new ConcurrentHashMap(); + private final Map _producers = new ConcurrentHashMap(); /** * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume @@ -195,7 +195,7 @@ public abstract class AMQSession _consumers = new IdToConsumerMap(); + private final Map _consumers = new ConcurrentHashMap<>(); /** * Contains a list of consumers which have been removed but which might still have @@ -294,12 +294,11 @@ public abstract class AMQSession getConsumers() + protected Collection getConsumers() { - return _consumers; + return new ArrayList(_consumers.values()); } protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) @@ -317,83 +316,6 @@ public abstract class AMQSession - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentMap _slowAccessConsumers = new ConcurrentHashMap(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection values() - { - ArrayList values = new ArrayList(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - /** * Creates a new session on a connection. * @@ -2490,7 +2412,7 @@ public abstract class AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 143de271a1..5fb9329af7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession consumersToCheck = new ArrayList(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; - for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + for(BasicMessageConsumer_0_8 consumer : getConsumers()) { if (consumer.isMessageListenerSet()) { @@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession _slowAccessSessions = new LinkedHashMap(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private final Map _sessionMap = new ConcurrentHashMap<>(); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; public AMQSession get(int channelId) { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } + return _sessionMap.get(channelId); } - public AMQSession put(int channelId, AMQSession session) + public void put(int channelId, AMQSession session) { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - + _sessionMap.put(channelId, session); } - public AMQSession remove(int channelId) + public void remove(int channelId) { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - + _sessionMap.remove(channelId); } public Collection values() { - ArrayList values = new ArrayList(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; + return new ArrayList<>(_sessionMap.values()); } public int size() { - return _size; + return _sessionMap.size(); } public void clear() { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } + _sessionMap.clear(); } /* @@ -141,14 +80,8 @@ public final class ChannelToSessionMap //go back to the start _idFactory.set(_minChannelID); } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } + + done = (!_sessionMap.keySet().contains(id)); } return id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index d9514338ce..d625a9ae69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -29,7 +29,7 @@ import javax.jms.XATopicConnection; import javax.jms.XATopicSession; /** - * This class implements the javax.njms.XAConnection interface + * This class implements the javax.jms.XAConnection interface */ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection { -- cgit v1.2.1 From 822ef8b38a63e3b351b30c382bfc77de39904c77 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 12 Feb 2015 17:57:11 +0000 Subject: QPID-6374 : avoid taking a lock when not modifying a value git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659341 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 27 ++++++++++++---------- 1 file changed, 15 insertions(+), 12 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 86e1bb0a8b..8f5e9524b6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -224,7 +224,7 @@ public abstract class AMQSession Date: Fri, 13 Feb 2015 17:01:59 +0000 Subject: QPID-6374: [Java Broker] 0-10 Failover: the thread performing the failover prep now syncs the dispatch queue (avoids possibility of app level dead lock) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659605 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 47 +++++++++++++++++-- .../apache/qpid/client/AMQConnectionDelegate.java | 2 - .../qpid/client/AMQConnectionDelegate_0_10.java | 24 ++++++++-- .../qpid/client/AMQConnectionDelegate_8_0.java | 5 -- .../java/org/apache/qpid/client/AMQSession.java | 44 +++++++++++------- .../org/apache/qpid/client/AMQSession_0_10.java | 2 +- .../qpid/client/BasicMessageConsumer_0_10.java | 2 +- .../apache/qpid/client/ChannelToSessionMap.java | 7 ++- .../client/util/FlowControllingBlockingQueue.java | 53 ++++++++++++++++++---- 9 files changed, 140 insertions(+), 46 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4c596b88a0..8e7b5b90d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } - public void failoverPrep() - { - _delegate.failoverPrep(); - } - public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); @@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _messageCompressionThresholdSize; } + + void doWithAllLocks(Runnable r) + { + doWithAllLocks(r, _sessions.values()); + + } + + private void doWithAllLocks(final Runnable r, final List sessions) + { + if (!sessions.isEmpty()) + { + AMQSession session = sessions.remove(0); + + final Object dispatcherLock = session.getDispatcherLock(); + if (dispatcherLock != null) + { + synchronized (dispatcherLock) + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (getFailoverMutex()) + { + r.run(); + } + } + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 74ca1ed74f..c359fbcc84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -52,8 +52,6 @@ public interface AMQConnectionDelegate XASession createXASession(int ackMode) throws JMSException; - void failoverPrep(); - void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index fdeab7ae70..e22a341205 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec List sessions = new ArrayList(_conn.getSessions().values()); for (AMQSession s : sessions) { - s.failoverPrep(); + ((AMQSession_0_10)s).failoverPrep(); } } @@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.notifyFailoverRequired(); - synchronized (_conn.getFailoverMutex()) + final AtomicBoolean failoverDone = new AtomicBoolean(); + + _conn.doWithAllLocks(new Runnable() { + @Override + public void run() + { try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { - _conn.failoverPrep(); + failoverPrep(); _conn.resubscribeSessions(); _conn.fireFailoverComplete(); - return; + failoverDone.set(true); } } catch (Exception e) @@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.getProtocolHandler().getFailoverLatch().countDown(); _conn.getProtocolHandler().setFailoverLatch(null); } + + } + }); + + + if (failoverDone.get()) + { + return; } + } + _conn.setClosed(); final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 35582d92b7..ae83b6ab48 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void failoverPrep() - { - // do nothing - } - /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f5e9524b6..3966e75423 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public abstract class AMQSession _queue; private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -358,7 +358,7 @@ public abstract class AMQSession(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -423,7 +423,7 @@ public abstract class AMQSession(_prefetchHighMark, null); } // Add creation logging to tie in with the existing close logging @@ -1789,7 +1789,7 @@ public abstract class AMQSession messages = _queue.iterator(); if (_logger.isDebugEnabled()) { _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" @@ -3237,6 +3233,12 @@ public abstract class AMQSession _sessionMap = new ConcurrentHashMap<>(); + private final Map _sessionMap = Collections.synchronizedMap(new LinkedHashMap()); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; @@ -48,7 +51,7 @@ public final class ChannelToSessionMap _sessionMap.remove(channelId); } - public Collection values() + public List values() { return new ArrayList<>(_sessionMap.values()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index b194ac88de..df54b7066b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; *

* TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ -public class FlowControllingBlockingQueue +public class FlowControllingBlockingQueue { private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final Queue _queue = new ConcurrentLinkedQueue(); + private final Queue _queue = new ConcurrentLinkedQueue(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue } } - public Object take() throws InterruptedException + public T blockingPeek() throws InterruptedException + { + T o = _queue.peek(); + if (o == null) + { + synchronized (this) + { + while ((o = _queue.peek()) == null) + { + wait(); + } + } + } + return o; + } + + public T nonBlockingTake() throws InterruptedException + { + T o = _queue.poll(); + + if (o != null && !disableFlowControl && _listener != null) + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + + } + + return o; + } + + public T take() throws InterruptedException { - Object o = _queue.poll(); + T o = _queue.poll(); if(o == null) { synchronized(this) @@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue return o; } - public void add(Object o) + public void add(T o) { synchronized(this) { @@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue } } - public Iterator iterator() + public Iterator iterator() { return _queue.iterator(); } -- cgit v1.2.1