summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-14 18:50:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-14 18:50:07 +0000
commitb13c1f50dc34854d0b348aed1fcd24c63e58c938 (patch)
tree648d523ac88ec704efa8ffd11e9ceed497d875bf /qpid/java
parent763396a3a8c18a9b42e2cf611333e61d4c6444de (diff)
downloadqpid-python-b13c1f50dc34854d0b348aed1fcd24c63e58c938.tar.gz
QPID-5552 : [Java Broker] Propertly handle transport exceptions encountered in methods invoked in AMQP 1.0 from the IOReceiver thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1568452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java227
1 files changed, 121 insertions, 106 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 58c3597a5c..fe214eb899 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -41,7 +41,6 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.ErrorCondition;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.transport.*;
@@ -52,12 +51,12 @@ import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
@@ -258,107 +257,113 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
public synchronized void received(ByteBuffer msg)
{
- _lastReadTime = System.currentTimeMillis();
- if(RAW_LOGGER.isLoggable(Level.FINE))
+ try
{
- ByteBuffer dup = msg.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
- }
- _readBytes += msg.remaining();
- switch(_state)
- {
- case A:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- break;
- }
- case M:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.M;
- break;
- }
-
- case Q:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.Q;
- break;
- }
- case P:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.P;
- break;
- }
- case PROTOCOL:
- if(msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.PROTOCOL;
- break;
- }
- case MAJOR:
- if(msg.hasRemaining())
- {
- _major = msg.get();
- }
- else
- {
- _state = State.MAJOR;
- break;
- }
- case MINOR:
- if(msg.hasRemaining())
- {
- _minor = msg.get();
- }
- else
- {
- _state = State.MINOR;
- break;
- }
- case REVISION:
- if(msg.hasRemaining())
- {
- _revision = msg.get();
-
- _state = State.FRAME;
- }
- else
- {
- _state = State.REVISION;
- break;
- }
- case FRAME:
- if(msg.hasRemaining())
- {
+ _lastReadTime = System.currentTimeMillis();
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup = msg.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString());
+ }
+ _readBytes += msg.remaining();
+ switch(_state)
+ {
+ case A:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ break;
+ }
+ case M:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.M;
+ break;
+ }
+
+ case Q:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.Q;
+ break;
+ }
+ case P:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.P;
+ break;
+ }
+ case PROTOCOL:
+ if (msg.hasRemaining())
+ {
+ msg.get();
+ }
+ else
+ {
+ _state = State.PROTOCOL;
+ break;
+ }
+ case MAJOR:
+ if (msg.hasRemaining())
+ {
+ _major = msg.get();
+ }
+ else
+ {
+ _state = State.MAJOR;
+ break;
+ }
+ case MINOR:
+ if (msg.hasRemaining())
+ {
+ _minor = msg.get();
+ }
+ else
+ {
+ _state = State.MINOR;
+ break;
+ }
+ case REVISION:
+ if (msg.hasRemaining())
+ {
+ _revision = msg.get();
+
+ _state = State.FRAME;
+ }
+ else
+ {
+ _state = State.REVISION;
+ break;
+ }
+ case FRAME:
+ if (msg.hasRemaining())
+ {
_frameHandler = _frameHandler.parse(msg);
- }
- }
-
+ }
+ }
+ }
+ catch(RuntimeException e)
+ {
+ exception(e);
+ }
}
public void exception(Throwable throwable)
@@ -378,6 +383,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_conn.close(err);
close();
}
+ catch(TransportException e)
+ {
+ _logger.info("Error when handling exception",e);
+ }
finally
{
if(throwable instanceof java.lang.Error)
@@ -394,13 +403,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
public void closed()
{
- // todo
- _conn.inputClosed();
- if (_conn != null && _conn.getConnectionEventListener() != null)
+ try
+ {
+ // todo
+ _conn.inputClosed();
+ if (_conn != null && _conn.getConnectionEventListener() != null)
+ {
+ ((Connection_1_0) _conn.getConnectionEventListener()).closed();
+ }
+ }
+ catch(RuntimeException e)
{
- ((Connection_1_0) _conn.getConnectionEventListener()).closed();
+ exception(e);
}
-
}
public long getCreateTime()