diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-23 01:21:22 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-23 01:21:22 +0000 |
| commit | 3a07c0d1d6f665ae0d5b308fa8a3342d2bf4b1c2 (patch) | |
| tree | 7082cdbe3e0833f29de69423ec9f0b99eb44aa61 /qpid/java/common/src/main | |
| parent | 386fc2f8dc103ae078c98e3fe5bcdfb7842f27de (diff) | |
| download | qpid-python-3a07c0d1d6f665ae0d5b308fa8a3342d2bf4b1c2.tar.gz | |
QPID-1339: support for low level session resume
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707241 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
9 files changed, 212 insertions, 136 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 71027e3256..f4dc4408be 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -53,10 +53,10 @@ public class Connection extends ConnectionInvoker implements Receiver<ProtocolEvent>, Sender<ProtocolEvent> { - enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } - private static final Logger log = Logger.get(Connection.class); + enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + class DefaultConnectionListener implements ConnectionListener { public void opened(Connection conn) {} @@ -202,9 +202,9 @@ public class Connection extends ConnectionInvoker return createSession(0); } - public Session createSession(long timeout) + public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), timeout); + return createSession(UUID.randomUUID().toString(), expiry); } public Session createSession(String name) @@ -212,25 +212,24 @@ public class Connection extends ConnectionInvoker return createSession(name, 0); } - public Session createSession(String name, long timeout) + public Session createSession(String name, long expiry) { - return createSession(Strings.toUTF8(name), timeout); + return createSession(Strings.toUTF8(name), expiry); } - public Session createSession(byte[] name, long timeout) + public Session createSession(byte[] name, long expiry) { - return createSession(new Binary(name), timeout); + return createSession(new Binary(name), expiry); } - public Session createSession(Binary name, long timeout) + public Session createSession(Binary name, long expiry) { synchronized (lock) { - Session ssn = new Session(this, name); + Session ssn = new Session(this, name, expiry); sessions.put(name, ssn); map(ssn); - ssn.sessionAttach(name.getBytes()); - ssn.sessionRequestTimeout(timeout); + ssn.attach(); return ssn; } } @@ -349,6 +348,19 @@ public class Connection extends ConnectionInvoker } } + public void resume() + { + synchronized (lock) + { + for (Session ssn : sessions.values()) + { + map(ssn); + ssn.attach(); + ssn.resume(); + } + } + } + public void exception(ConnectionException e) { synchronized (lock) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java index dcf05d9f72..c1031c9a1c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -39,8 +39,9 @@ public class Echo implements SessionListener public void message(Session ssn, MessageTransfer xfr) { + int id = xfr.getId(); ssn.invoke(xfr); - ssn.processed(xfr); + ssn.processed(id); } public void exception(Session ssn, SessionException exc) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index be1ea54c93..e4b8ade285 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -140,7 +140,7 @@ public class ServerDelegate extends ConnectionDelegate public Session getSession(Connection conn, SessionAttach atc) { - return new Session(conn, new Binary(atc.getName())); + return new Session(conn, new Binary(atc.getName()), 0); } @Override public void sessionAttach(Connection conn, SessionAttach atc) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 125f000543..e96aaf1b99 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -24,6 +24,7 @@ package org.apache.qpid.transport; import org.apache.qpid.transport.network.Frame; import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.transport.Option.*; +import static org.apache.qpid.transport.Session.State.*; import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.util.Serial.*; import static org.apache.qpid.util.Strings.*; @@ -49,6 +51,8 @@ public class Session extends SessionInvoker private static final Logger log = Logger.get(Session.class); + enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED } + class DefaultSessionListener implements SessionListener { @@ -69,49 +73,43 @@ public class Session extends SessionInvoker public static final int UNLIMITED_CREDIT = 0xFFFFFFFF; - private static boolean ENABLE_REPLAY = false; - - static - { - String enableReplay = "enable_command_replay"; - try - { - ENABLE_REPLAY = new Boolean(System.getProperties().getProperty(enableReplay, "false")); - } - catch (Exception e) - { - ENABLE_REPLAY = false; - } - } - private Connection connection; private Binary name; + private long expiry; private int channel; private SessionDelegate delegate = new SessionDelegate(); private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; + private boolean incomingInit; // incoming command count - int commandsIn = 0; + private int commandsIn; // completed incoming commands private final Object processedLock = new Object(); - private RangeSet processed = new RangeSet(); - private int maxProcessed = commandsIn - 1; - private int syncPoint = maxProcessed; + private RangeSet processed; + private int maxProcessed; + private int syncPoint; // outgoing command count private int commandsOut = 0; - private Map<Integer,Method> commands = new HashMap<Integer,Method>(); + private Method[] commands = new Method[64*1024]; private int maxComplete = commandsOut - 1; private boolean needSync = false; - private AtomicBoolean closed = new AtomicBoolean(false); + private State state = NEW; - Session(Connection connection, Binary name) + Session(Connection connection, Binary name, long expiry) { this.connection = connection; this.name = name; + this.expiry = expiry; + initReceiver(); + } + + public Connection getConnection() + { + return connection; } public Binary getName() @@ -119,6 +117,11 @@ public class Session extends SessionInvoker return name; } + void setExpiry(long expiry) + { + this.expiry = expiry; + } + int getChannel() { return channel; @@ -154,9 +157,63 @@ public class Session extends SessionInvoker } } - public Map<Integer,Method> getOutstandingCommands() + private void initReceiver() + { + synchronized (processedLock) + { + incomingInit = false; + processed = new RangeSet(); + } + } + + void attach() { - return commands; + initReceiver(); + sessionAttach(name.getBytes()); + sessionRequestTimeout(expiry); + } + + void resume() + { + synchronized (commands) + { + for (int i = maxComplete + 1; lt(i, commandsOut); i++) + { + Method m = commands[mod(i, commands.length)]; + if (m != null) + { + sessionCommandPoint(m.getId(), 0); + send(m); + } + } + } + } + + void dump() + { + synchronized (commands) + { + for (Method m : commands) + { + if (m != null) + { + System.out.println(m); + } + } + } + } + + final void commandPoint(int id) + { + synchronized (processedLock) + { + this.commandsIn = id; + if (!incomingInit) + { + maxProcessed = commandsIn - 1; + syncPoint = maxProcessed; + } + } } public int getCommandsOut() @@ -209,11 +266,12 @@ public class Session extends SessionInvoker public void processed(Range range) { - log.debug("%s processed(%s)", this, range); + log.debug("%s processed(%s) %s %s", this, range, syncPoint, maxProcessed); boolean flush; synchronized (processedLock) { + log.debug("%s", processed); processed.add(range); Range first = processed.getFirst(); int lower = first.getLower(); @@ -281,14 +339,6 @@ public class Session extends SessionInvoker } } - public Method getCommand(int id) - { - synchronized (commands) - { - return commands.get(id); - } - } - boolean complete(int lower, int upper) { //avoid autoboxing @@ -301,13 +351,13 @@ public class Session extends SessionInvoker int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - commands.remove(id); + commands[mod(id, commands.length)] = null; } if (le(lower, maxComplete + 1)) { maxComplete = max(maxComplete, upper); } - log.debug("%s commands remaining: %s", this, commands); + log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); commands.notifyAll(); return gt(maxComplete, old); } @@ -329,38 +379,47 @@ public class Session extends SessionInvoker } } - public void invoke(Method m) + final private boolean isFull(int id) { - if (closed.get()) - { - ExecutionException exc = getException(); - if (exc != null) - { - throw new SessionException(exc); - } - else if (close != null) - { - throw new ConnectionException(close); - } - else - { - throw new SessionClosedException(); - } - } + return id - maxComplete >= commands.length; + } + public void invoke(Method m) + { if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) { + if (state == CLOSED) + { + throw new SessionClosedException(); + } + int next = commandsOut++; m.setId(next); + + if (isFull(next)) + { + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && isFull(next)) + { + sessionFlush(COMPLETED); + w.await(); + } + } + + if (isFull(next)) + { + throw new SessionException("timed out waiting for completion"); + } + if (next == 0) { sessionCommandPoint(0, 0); } - if (ENABLE_REPLAY) + if (expiry > 0) { - commands.put(next, m); + commands[mod(next, commands.length)] = m; } if (autoSync) { @@ -404,31 +463,23 @@ public class Session extends SessionInvoker executionSync(SYNC); } - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed.get() && elapsed < timeout && lt(maxComplete, point)) + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - try { - log.debug("%s waiting for[%d]: %d, %s", this, point, - maxComplete, commands); - commands.wait(timeout - elapsed); - elapsed = System.currentTimeMillis() - start; - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + log.debug("%s waiting for[%d]: %d, %s", this, point, + maxComplete, commands); + w.await(); } if (lt(maxComplete, point)) { - if (closed.get()) + if (state == CLOSED) { throw new SessionException(getException()); } else { - throw new RuntimeException + throw new SessionException (String.format ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point)); } @@ -518,20 +569,11 @@ public class Session extends SessionInvoker { synchronized (this) { - long start = System.currentTimeMillis(); - long elapsed = 0; - while (!closed.get() && timeout - elapsed > 0 && !isDone()) + Waiter w = new Waiter(this, timeout); + while (w.hasTime() && state != CLOSED && !isDone()) { - try - { - log.debug("%s waiting for result: %s", Session.this, this); - wait(timeout - elapsed); - elapsed = System.currentTimeMillis() - start; - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + log.debug("%s waiting for result: %s", Session.this, this); + w.await(); } } @@ -539,13 +581,15 @@ public class Session extends SessionInvoker { return result; } - else if (closed.get()) + else if (state == CLOSED) { throw new SessionException(getException()); } else { - return null; + throw new SessionException + (String.format("%s timed out waiting for result: %s", + Session.this, this)); } } @@ -588,32 +632,24 @@ public class Session extends SessionInvoker public void close() { - sessionRequestTimeout(0); - sessionDetach(name.getBytes()); synchronized (commands) { - long start = System.currentTimeMillis(); - long elapsed = 0; - try + state = CLOSING; + sessionRequestTimeout(0); + sessionDetach(name.getBytes()); + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && state != CLOSED) { - while (!closed.get() && elapsed < timeout) - { - commands.wait(timeout - elapsed); - elapsed = System.currentTimeMillis() - start; - } - - if (!closed.get()) - { - throw new SessionException("close() timed out"); - } + w.await(); } - catch (InterruptedException e) + + if (state != CLOSED) { - throw new RuntimeException(e); + throw new SessionException("close() timed out"); } - } - connection.removeSession(this); + connection.removeSession(this); + } } public void exception(Throwable t) @@ -623,18 +659,27 @@ public class Session extends SessionInvoker public void closed() { - closed.set(true); synchronized (commands) { + if (expiry == 0) + { + state = CLOSED; + } + else + { + state = DETACHED; + } + commands.notifyAll(); - } - synchronized (results) - { - for (ResultFuture<?> result : results.values()) + + synchronized (results) { - synchronized(result) + for (ResultFuture<?> result : results.values()) { - result.notifyAll(); + synchronized(result) + { + result.notifyAll(); + } } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index f6a1735b68..e2b6980dd4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -57,7 +57,10 @@ public class SessionDelegate log.warn("UNHANDLED: [%s] %s", ssn, method); } - @Override public void sessionTimeout(Session ssn, SessionTimeout t) {} + @Override public void sessionTimeout(Session ssn, SessionTimeout t) + { + ssn.setExpiry(t.getTimeout()); + } @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) { @@ -113,7 +116,7 @@ public class SessionDelegate @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp) { - ssn.commandsIn = scp.getCommandId(); + ssn.commandPoint(scp.getCommandId()); } @Override public void executionSync(Session ssn, ExecutionSync sync) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index bb7d2506e3..7908700cbe 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -208,11 +208,14 @@ public final class Disassembler implements Sender<ProtocolEvent>, if (payload) { final Header hdr = method.getHeader(); - final Struct[] structs = hdr.getStructs(); - - for (Struct st : structs) + if (hdr != null) { - enc.writeStruct32(st); + final Struct[] structs = hdr.getStructs(); + + for (Struct st : structs) + { + enc.writeStruct32(st); + } } headerSeg = enc.segment(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index f25b16d71a..73ff039be5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -88,12 +88,6 @@ public final class IoSender extends Thread implements Sender<ByteBuffer> return result; } - private static final int mod(int n, int m) - { - int r = n % m; - return r < 0 ? m + r : r; - } - public void send(ByteBuffer buf) { if (closed.get()) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index 2c6984e302..c220694b50 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -34,6 +34,12 @@ import static java.lang.Math.*; public class Functions { + public static final int mod(int n, int m) + { + int r = n % m; + return r < 0 ? m + r : r; + } + public static final byte lsb(int i) { return (byte) (0xFF & i); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java index 4b199bafe6..a0bbbb22de 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -79,4 +79,16 @@ public final class Strings } } + public static final String fromUTF8(byte[] bytes) + { + try + { + return new String(bytes, "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + } |
