summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-21 10:14:04 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-21 10:14:04 +0000
commit8ae98ae258f94067bd91595ecbcccdf32165657f (patch)
tree370282cbf5086de029bc6d2d8d6f861f6c1b7965 /qpid/java/common
parent534a44b256ed41b8797f73c5990a12604153b7b6 (diff)
downloadqpid-python-8ae98ae258f94067bd91595ecbcccdf32165657f.tar.gz
QPID-3532: make the 0-10 client hold the failover mutex during the failover. Alter the Address resolution code to allow resolving addresses after failover. Add some more failover tests (inc ADDR based ones). Make the failover process notify any waiters in the session to abort and let failover proceed.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1187279 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java58
2 files changed, 57 insertions, 14 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 1c521244d0..06c5c83031 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- for (Session ssn : channels.values())
- {
- ssn.closeCode(close);
- }
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
{
@@ -705,4 +701,13 @@ public class Connection extends ConnectionInvoker
{
return sessions.containsKey(new Binary(name.getBytes()));
}
+
+ public void notifyFailoverRequired()
+ {
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
+ {
+ ssn.notifyFailoverRequired();
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index b732191707..321e5256b2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Session
@@ -125,6 +126,8 @@ public class Session extends SessionInvoker
private SessionDetachCode detachCode;
private final Object stateLock = new Object();
+ private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -257,6 +260,7 @@ public class Session extends SessionInvoker
void resume()
{
+ _failoverRequired.set(false);
synchronized (commands)
{
attach();
@@ -459,7 +463,7 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED || state == CLOSING)
+ if (state == DETACHED || state == CLOSING || state == CLOSED)
{
return;
}
@@ -595,11 +599,12 @@ public class Session extends SessionInvoker
if (state != OPEN && state != CLOSED && state != CLOSING)
{
Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (!current.equals(resumer) )
{
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && (state != OPEN && state != CLOSED))
{
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -668,6 +673,7 @@ public class Session extends SessionInvoker
}
}
}
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -762,6 +768,14 @@ public class Session extends SessionInvoker
}
}
+ private void checkFailoverRequired(String message)
+ {
+ if (_failoverRequired.get())
+ {
+ throw new SessionException(message);
+ }
+ }
+
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
@@ -787,6 +801,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
+ checkFailoverRequired("Session sync was interrupted by failover.");
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -847,13 +862,6 @@ public class Session extends SessionInvoker
}
}
- private ConnectionClose close = null;
-
- void closeCode(ConnectionClose close)
- {
- this.close = close;
- }
-
ExecutionException getException()
{
synchronized (results)
@@ -904,6 +912,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
+ checkFailoverRequired("Operation was interrupted by failover.");
log.debug("%s waiting for result: %s", Session.this, this);
w.await();
}
@@ -915,7 +924,12 @@ public class Session extends SessionInvoker
}
else if (state == CLOSED)
{
- throw new SessionException(getException());
+ ExecutionException ex = getException();
+ if(ex == null)
+ {
+ throw new SessionClosedException();
+ }
+ throw new SessionException(ex);
}
else
{
@@ -995,6 +1009,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED)
{
+ checkFailoverRequired("close() was interrupted by failover.");
w.await();
}
@@ -1089,6 +1104,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(stateLock, timeout);
while (w.hasTime() && state == NEW)
{
+ checkFailoverRequired("Session opening was interrupted by failover.");
w.await();
}
}
@@ -1111,4 +1127,26 @@ public class Session extends SessionInvoker
{
return stateLock;
}
+
+ protected void notifyFailoverRequired()
+ {
+ //ensure any operations waiting are aborted to
+ //prevent them waiting for timeout for 60 seconds
+ //and possibly preventing failover proceeding
+ _failoverRequired.set(true);
+ synchronized (commands)
+ {
+ commands.notifyAll();
+ }
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
+ }
+ }
+ }
}