summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-12-10 14:45:16 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-12-10 14:45:16 +0000
commit86915ff094ab2fea35b642e75e0a5cf5804328f1 (patch)
treeb7e8eb71fcfbe8f012de7a581995cafb6a9825f1
parent75b1fbd3a04b61bd2f42f37058f80d177bc718ba (diff)
downloadqpid-python-86915ff094ab2fea35b642e75e0a5cf5804328f1.tar.gz
Merged r1549670 to 0.26 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1549852 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java5
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java55
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java11
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java1
4 files changed, 57 insertions, 15 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
index c98ebadaa6..83cb36246a 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
@@ -272,13 +272,14 @@ public class FrameHandler implements ProtocolHandler
{
_connection.handleError(frameParsingError);
}
- return this;
}
catch(RuntimeException e)
{
+ // This exception is unexpected. The up layer should handle error condition gracefully
+ _connection.handleError(this.createError(ConnectionError.CONNECTION_FORCED, e.toString()));
e.printStackTrace();
- throw e;
}
+ return this;
}
private static String toHex(ByteBuffer in)
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 5775eb4c2e..a3c4ad7b5a 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -24,6 +24,7 @@ package org.apache.qpid.amqp_1_0.transport;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
@@ -39,11 +40,11 @@ import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
+
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -290,8 +291,16 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private SessionEndpoint getSession(final short channel)
{
- // TODO assert existence, check channel state
- return _receivingSessions[channel];
+ SessionEndpoint session = _receivingSessions[channel];
+ if (session == null)
+ {
+ Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
+ this.handleError(error);
+ }
+
+ return session;
}
@@ -470,6 +479,11 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
endpoint.setReceivingChannel(channel);
endpoint.setNextIncomingId(begin.getNextOutgoingId());
endpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
+
+ if (endpoint.getState() == SessionState.END_SENT)
+ {
+ _sendingSessions[myChannelId] = null;
+ }
}
else
{
@@ -551,41 +565,59 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
}
- public synchronized void sendEnd(short channel, End end)
+ public synchronized void sendEnd(short channel, End end, boolean remove)
{
send(channel, end);
- _sendingSessions[channel] = null;
+ if (remove)
+ {
+ _sendingSessions[channel] = null;
+ }
}
public synchronized void receiveAttach(short channel, Attach attach)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveAttach(attach);
+ if (endPoint != null)
+ {
+ endPoint.receiveAttach(attach);
+ }
}
public synchronized void receiveDetach(short channel, Detach detach)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveDetach(detach);
+ if (endPoint != null)
+ {
+ endPoint.receiveDetach(detach);
+ }
}
public synchronized void receiveTransfer(short channel, Transfer transfer)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveTransfer(transfer);
+ if (endPoint != null)
+ {
+ endPoint.receiveTransfer(transfer);
+ }
}
public synchronized void receiveDisposition(short channel, Disposition disposition)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveDisposition(disposition);
+ if (endPoint != null)
+ {
+ endPoint.receiveDisposition(disposition);
+ }
}
public synchronized void receiveFlow(short channel, Flow flow)
{
SessionEndpoint endPoint = getSession(channel);
- endPoint.receiveFlow(flow);
+ if (endPoint != null)
+ {
+ endPoint.receiveFlow(flow);
+ }
}
@@ -667,8 +699,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
Close close = new Close();
close.setError(error);
send((short) 0, close);
+
+ this.setClosedForOutput(true);
}
- _closedForInput = true;
}
private final Logger _logger = Logger.getLogger("FRM");
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index f82ca7ee57..f7a3cd3800 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -118,6 +118,9 @@ public class SessionEndpoint
case BEGIN_SENT:
_state = SessionState.ACTIVE;
break;
+ case END_PIPE:
+ _state = SessionState.END_SENT;
+ break;
default:
// TODO error
@@ -158,6 +161,10 @@ public class SessionEndpoint
{
switch(_state)
{
+ case BEGIN_SENT:
+ _connection.sendEnd(getSendingChannel(), new End(), false);
+ _state = SessionState.END_PIPE;
+ break;
case END_SENT:
_state = SessionState.ENDED;
break;
@@ -165,7 +172,7 @@ public class SessionEndpoint
detachLinks();
_sessionEventListener.remoteEnd(end);
short sendChannel = getSendingChannel();
- _connection.sendEnd(sendChannel, new End());
+ _connection.sendEnd(sendChannel, new End(), true);
_state = end == null ? SessionState.END_SENT : SessionState.ENDED;
break;
default:
@@ -175,7 +182,7 @@ public class SessionEndpoint
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
- _connection.sendEnd(sendChannel, reply);
+ _connection.sendEnd(sendChannel, reply, true);
break;
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
index 9c4c3532bd..76508a58f1 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionState.java
@@ -26,6 +26,7 @@ public enum SessionState
ACTIVE,
INACTIVE,
BEGIN_SENT,
+ END_PIPE,
BEGIN_RECVD,
END_SENT,
END_RECVD,