diff options
Diffstat (limited to 'qpid/java/common')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java | 13 | ||||
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 58 |
2 files changed, 57 insertions, 14 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 1c521244d0..06c5c83031 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -526,10 +526,6 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - for (Session ssn : channels.values()) - { - ssn.closeCode(close); - } ConnectionCloseCode code = close.getReplyCode(); if (code != ConnectionCloseCode.NORMAL) { @@ -705,4 +701,13 @@ public class Connection extends ConnectionInvoker { return sessions.containsKey(new Binary(name.getBytes())); } + + public void notifyFailoverRequired() + { + List<Session> values = new ArrayList<Session>(channels.values()); + for (Session ssn : values) + { + ssn.notifyFailoverRequired(); + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index b732191707..321e5256b2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Session @@ -125,6 +126,8 @@ public class Session extends SessionInvoker private SessionDetachCode detachCode; private final Object stateLock = new Object(); + private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); @@ -257,6 +260,7 @@ public class Session extends SessionInvoker void resume() { + _failoverRequired.set(false); synchronized (commands) { attach(); @@ -459,7 +463,7 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED || state == CLOSING) + if (state == DETACHED || state == CLOSING || state == CLOSED) { return; } @@ -595,11 +599,12 @@ public class Session extends SessionInvoker if (state != OPEN && state != CLOSED && state != CLOSING) { Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (!current.equals(resumer) ) { Waiter w = new Waiter(commands, timeout); while (w.hasTime() && (state != OPEN && state != CLOSED)) { + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -668,6 +673,7 @@ public class Session extends SessionInvoker } } } + checkFailoverRequired("Command was interrupted because of failover, before being sent"); w.await(); } } @@ -762,6 +768,14 @@ public class Session extends SessionInvoker } } + private void checkFailoverRequired(String message) + { + if (_failoverRequired.get()) + { + throw new SessionException(message); + } + } + protected boolean shouldIssueFlush(int next) { return (next % 65536) == 0; @@ -787,6 +801,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { + checkFailoverRequired("Session sync was interrupted by failover."); log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); w.await(); } @@ -847,13 +862,6 @@ public class Session extends SessionInvoker } } - private ConnectionClose close = null; - - void closeCode(ConnectionClose close) - { - this.close = close; - } - ExecutionException getException() { synchronized (results) @@ -904,6 +912,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(this, timeout); while (w.hasTime() && state != CLOSED && !isDone()) { + checkFailoverRequired("Operation was interrupted by failover."); log.debug("%s waiting for result: %s", Session.this, this); w.await(); } @@ -915,7 +924,12 @@ public class Session extends SessionInvoker } else if (state == CLOSED) { - throw new SessionException(getException()); + ExecutionException ex = getException(); + if(ex == null) + { + throw new SessionClosedException(); + } + throw new SessionException(ex); } else { @@ -995,6 +1009,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && state != CLOSED) { + checkFailoverRequired("close() was interrupted by failover."); w.await(); } @@ -1089,6 +1104,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(stateLock, timeout); while (w.hasTime() && state == NEW) { + checkFailoverRequired("Session opening was interrupted by failover."); w.await(); } } @@ -1111,4 +1127,26 @@ public class Session extends SessionInvoker { return stateLock; } + + protected void notifyFailoverRequired() + { + //ensure any operations waiting are aborted to + //prevent them waiting for timeout for 60 seconds + //and possibly preventing failover proceeding + _failoverRequired.set(true); + synchronized (commands) + { + commands.notifyAll(); + } + synchronized (results) + { + for (ResultFuture<?> result : results.values()) + { + synchronized(result) + { + result.notifyAll(); + } + } + } + } } |
