From 5b5e8fc7d7ed98f91e57db5c408d91796302da45 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Sat, 19 Feb 2011 13:51:04 +0000 Subject: QPID-2935: merge latest trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072330 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQConnectionDelegate_0_10.java | 14 ++++++++---- .../org/apache/qpid/client/AMQSession_0_10.java | 9 +------- .../client/message/AMQMessageDelegate_0_10.java | 25 ++++++++++------------ 3 files changed, 22 insertions(+), 26 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4b4417b6ef..b0bd8f8e97 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void resubscribeSessions() throws JMSException, AMQException, FailoverException { List sessions = new ArrayList(_conn.getSessions().values()); - _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size())); for (AMQSession s : sessions) { - ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; s.resubscribe(); } } - public void closeConnection(long timeout) throws JMSException, AMQException { try @@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec ConnectionClose close = exc.getClose(); if (close == null) { + _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); + try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { _conn.failoverPrep(); - _qpidConnection.resume(); + _conn.resubscribeSessions(); _conn.fireFailoverComplete(); return; } @@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _logger.error("error during failover", e); } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); + } } ExceptionListener listener = _conn._exceptionListener; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1eaccf53fc..517a7a5ce8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQSession _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + private static final Map> _destinationCache = Collections.synchronizedMap(new HashMap>()); public static final String JMS_TYPE = "x-jms-type"; @@ -229,22 +229,19 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else { - Destination dest = _destinationCache.get(replyTo); + Destination dest = null; + SoftReference ref = _destinationCache.get(replyTo); + if (ref != null) + { + dest = ref.get(); + } if (dest == null) { String exchange = replyTo.getExchange(); String routingKey = replyTo.getRoutingKey(); - dest = generateDestination(exchange == null ? new AMQShortString("") : - new AMQShortString(exchange), - routingKey == null ? new AMQShortString(""): - new AMQShortString(routingKey)); - - - - - - _destinationCache.put(replyTo, dest); + dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + _destinationCache.put(replyTo, new SoftReference(dest)); } return dest; @@ -291,7 +288,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); - _destinationCache.put(replyTo, destination); + _destinationCache.put(replyTo, new SoftReference(destination)); _messageProps.setReplyTo(replyTo); } -- cgit v1.2.1