diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-12-10 14:45:16 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-12-10 14:45:16 +0000 |
commit | 86915ff094ab2fea35b642e75e0a5cf5804328f1 (patch) | |
tree | b7e8eb71fcfbe8f012de7a581995cafb6a9825f1 | |
parent | 75b1fbd3a04b61bd2f42f37058f80d177bc718ba (diff) | |
download | qpid-python-86915ff094ab2fea35b642e75e0a5cf5804328f1.tar.gz |
Merged r1549670 to 0.26 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1549852 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 57 insertions, 15 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java index c98ebadaa6..83cb36246a 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java @@ -272,13 +272,14 @@ public class FrameHandler implements ProtocolHandler { _connection.handleError(frameParsingError); } - return this; } catch(RuntimeException e) { + // This exception is unexpected. The up layer should handle error condition gracefully + _connection.handleError(this.createError(ConnectionError.CONNECTION_FORCED, e.toString())); e.printStackTrace(); - throw e; } + return this; } private static String toHex(ByteBuffer in) diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 5775eb4c2e..a3c4ad7b5a 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -24,6 +24,7 @@ package org.apache.qpid.amqp_1_0.transport; import java.util.HashSet; import java.util.Map; import java.util.Set; + import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry; import org.apache.qpid.amqp_1_0.codec.ValueWriter; import org.apache.qpid.amqp_1_0.framing.AMQFrame; @@ -39,11 +40,11 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; - import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServerFactory; + import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -290,8 +291,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private SessionEndpoint getSession(final short channel) { - // TODO assert existence, check channel state - return _receivingSessions[channel]; + SessionEndpoint session = _receivingSessions[channel]; + if (session == null) + { + Error error = new Error(); + error.setCondition(ConnectionError.FRAMING_ERROR); + error.setDescription("Frame received on channel " + channel + " which is not known as a begun session."); + this.handleError(error); + } + + return session; } @@ -470,6 +479,11 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour endpoint.setReceivingChannel(channel); endpoint.setNextIncomingId(begin.getNextOutgoingId()); endpoint.setOutgoingSessionCredit(begin.getIncomingWindow()); + + if (endpoint.getState() == SessionState.END_SENT) + { + _sendingSessions[myChannelId] = null; + } } else { @@ -551,41 +565,59 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } - public synchronized void sendEnd(short channel, End end) + public synchronized void sendEnd(short channel, End end, boolean remove) { send(channel, end); - _sendingSessions[channel] = null; + if (remove) + { + _sendingSessions[channel] = null; + } } public synchronized void receiveAttach(short channel, Attach attach) { SessionEndpoint endPoint = getSession(channel); - endPoint.receiveAttach(attach); + if (endPoint != null) + { + endPoint.receiveAttach(attach); + } } public synchronized void receiveDetach(short channel, Detach detach) { SessionEndpoint endPoint = getSession(channel); - endPoint.receiveDetach(detach); + if (endPoint != null) + { + endPoint.receiveDetach(detach); + } } public synchronized void receiveTransfer(short channel, Transfer transfer) { SessionEndpoint endPoint = getSession(channel); - endPoint.receiveTransfer(transfer); + if (endPoint != null) + { + endPoint.receiveTransfer(transfer); + } } public synchronized void receiveDisposition(short channel, Disposition disposition) { SessionEndpoint endPoint = getSession(channel); - endPoint.receiveDisposition(disposition); + if (endPoint != null) + { + endPoint.receiveDisposition(disposition); + } } public synchronized void receiveFlow(short channel, Flow flow) { SessionEndpoint endPoint = getSession(channel); - endPoint.receiveFlow(flow); + if (endPoint != null) + { + endPoint.receiveFlow(flow); + } } @@ -667,8 +699,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour Close close = new Close(); close.setError(error); send((short) 0, close); + + this.setClosedForOutput(true); } - _closedForInput = true; } private final Logger _logger = Logger.getLogger("FRM"); diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index f82ca7ee57..f7a3cd3800 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -118,6 +118,9 @@ public class SessionEndpoint case BEGIN_SENT: _state = SessionState.ACTIVE; break; + case END_PIPE: + _state = SessionState.END_SENT; + break; default: // TODO error @@ -158,6 +161,10 @@ public class SessionEndpoint { switch(_state) { + case BEGIN_SENT: + _connection.sendEnd(getSendingChannel(), new End(), false); + _state = SessionState.END_PIPE; + break; case END_SENT: _state = SessionState.ENDED; break; @@ -165,7 +172,7 @@ public class SessionEndpoint detachLinks(); _sessionEventListener.remoteEnd(end); short sendChannel = getSendingChannel(); - _connection.sendEnd(sendChannel, new End()); + _connection.sendEnd(sendChannel, new End(), true); _state = end == null ? SessionState.END_SENT : SessionState.ENDED; break; default: @@ -175,7 +182,7 @@ public class SessionEndpoint error.setCondition(AmqpError.ILLEGAL_STATE); error.setDescription("END called on Session which has not been opened"); reply.setError(error); - _connection.sendEnd(sendChannel, reply); + _connection.sendEnd(sendChannel, reply, true); break; diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java index 9c4c3532bd..76508a58f1 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java @@ -26,6 +26,7 @@ public enum SessionState ACTIVE, INACTIVE, BEGIN_SENT, + END_PIPE, BEGIN_RECVD, END_SENT, END_RECVD, |