diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-14 18:50:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-14 18:50:07 +0000 |
| commit | b13c1f50dc34854d0b348aed1fcd24c63e58c938 (patch) | |
| tree | 648d523ac88ec704efa8ffd11e9ceed497d875bf /qpid/java | |
| parent | 763396a3a8c18a9b42e2cf611333e61d4c6444de (diff) | |
| download | qpid-python-b13c1f50dc34854d0b348aed1fcd24c63e58c938.tar.gz | |
QPID-5552 : [Java Broker] Propertly handle transport exceptions encountered in methods invoked in AMQP 1.0 from the IOReceiver thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1568452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java | 227 |
1 files changed, 121 insertions, 106 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 58c3597a5c..fe214eb899 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -41,7 +41,6 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.ErrorCondition; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.transport.*; @@ -52,12 +51,12 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler @@ -258,107 +257,113 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut public synchronized void received(ByteBuffer msg) { - _lastReadTime = System.currentTimeMillis(); - if(RAW_LOGGER.isLoggable(Level.FINE)) + try { - ByteBuffer dup = msg.duplicate(); - byte[] data = new byte[dup.remaining()]; - dup.get(data); - Binary bin = new Binary(data); - RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); - } - _readBytes += msg.remaining(); - switch(_state) - { - case A: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - break; - } - case M: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.M; - break; - } - - case Q: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.Q; - break; - } - case P: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.P; - break; - } - case PROTOCOL: - if(msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.PROTOCOL; - break; - } - case MAJOR: - if(msg.hasRemaining()) - { - _major = msg.get(); - } - else - { - _state = State.MAJOR; - break; - } - case MINOR: - if(msg.hasRemaining()) - { - _minor = msg.get(); - } - else - { - _state = State.MINOR; - break; - } - case REVISION: - if(msg.hasRemaining()) - { - _revision = msg.get(); - - _state = State.FRAME; - } - else - { - _state = State.REVISION; - break; - } - case FRAME: - if(msg.hasRemaining()) - { + _lastReadTime = System.currentTimeMillis(); + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + ByteBuffer dup = msg.duplicate(); + byte[] data = new byte[dup.remaining()]; + dup.get(data); + Binary bin = new Binary(data); + RAW_LOGGER.fine("RECV[" + getRemoteAddress() + "] : " + bin.toString()); + } + _readBytes += msg.remaining(); + switch(_state) + { + case A: + if (msg.hasRemaining()) + { + msg.get(); + } + else + { + break; + } + case M: + if (msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.M; + break; + } + + case Q: + if (msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.Q; + break; + } + case P: + if (msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.P; + break; + } + case PROTOCOL: + if (msg.hasRemaining()) + { + msg.get(); + } + else + { + _state = State.PROTOCOL; + break; + } + case MAJOR: + if (msg.hasRemaining()) + { + _major = msg.get(); + } + else + { + _state = State.MAJOR; + break; + } + case MINOR: + if (msg.hasRemaining()) + { + _minor = msg.get(); + } + else + { + _state = State.MINOR; + break; + } + case REVISION: + if (msg.hasRemaining()) + { + _revision = msg.get(); + + _state = State.FRAME; + } + else + { + _state = State.REVISION; + break; + } + case FRAME: + if (msg.hasRemaining()) + { _frameHandler = _frameHandler.parse(msg); - } - } - + } + } + } + catch(RuntimeException e) + { + exception(e); + } } public void exception(Throwable throwable) @@ -378,6 +383,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _conn.close(err); close(); } + catch(TransportException e) + { + _logger.info("Error when handling exception",e); + } finally { if(throwable instanceof java.lang.Error) @@ -394,13 +403,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut public void closed() { - // todo - _conn.inputClosed(); - if (_conn != null && _conn.getConnectionEventListener() != null) + try + { + // todo + _conn.inputClosed(); + if (_conn != null && _conn.getConnectionEventListener() != null) + { + ((Connection_1_0) _conn.getConnectionEventListener()).closed(); + } + } + catch(RuntimeException e) { - ((Connection_1_0) _conn.getConnectionEventListener()).closed(); + exception(e); } - } public long getCreateTime() |
