diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
| commit | 30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (patch) | |
| tree | f3d16257ed0a431f2f4c43166f4df84ccb877a6c /qpid/java/amqp-1-0-common | |
| parent | b165cf52a4ef16ac5a5ee181d4da2db351f7882d (diff) | |
| download | qpid-python-30d213dc1e6d743f2f0abb44c8bc91868d5126b1.tar.gz | |
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-common')
| -rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java | 15 | ||||
| -rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java | 81 | ||||
| -rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java (renamed from qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java) | 4 | ||||
| -rw-r--r-- | qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java | 12 |
4 files changed, 67 insertions, 45 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java index b846b16722..b96e1ab47b 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java @@ -66,20 +66,7 @@ public class SymbolTypeConstructor extends VariableWidthTypeConstructor if(symbolVal == null) { ByteBuffer dup = in.duplicate(); - try - { - dup.limit(in.position()+size); - } - catch (IllegalArgumentException e) - { - System.err.println("in.position(): " + in.position()); - System.err.println("size: " + size); - System.err.println("dup.position(): " + dup.position()); - System.err.println("dup.capacity(): " + dup.capacity()); - System.err.println("dup.limit(): " + dup.limit()); - throw e; - - } + dup.limit(in.position()+size); CharBuffer charBuf = ASCII.decode(dup); diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java index 119dd6bf3a..54a4f22d48 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java @@ -65,6 +65,12 @@ public class ConnectionHandler public boolean parse(ByteBuffer in) { + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + Binary b = new Binary(in.array(),in.arrayOffset()+in.position(),in.remaining()); + RAW_LOGGER.fine("RECV [" + _connection.getRemoteAddress() + "] : " + b.toString()); + } + while(in.hasRemaining() && !isDone()) { _delegate = _delegate.parse(in); @@ -376,6 +382,47 @@ public class ConnectionHandler } + public static class SequentialFrameSource implements FrameSource + { + private Queue<FrameSource> _sources = new LinkedList<FrameSource>(); + + public SequentialFrameSource(FrameSource... sources) + { + _sources.addAll(Arrays.asList(sources)); + } + + public synchronized void addSource(FrameSource source) + { + _sources.add(source); + } + + @Override + public synchronized AMQFrame getNextFrame(final boolean wait) + { + FrameSource src = _sources.peek(); + while (src != null && src.closed()) + { + _sources.poll(); + src = _sources.peek(); + } + + if(src != null) + { + return src.getNextFrame(wait); + } + else + { + return null; + } + } + + public boolean closed() + { + return _sources.isEmpty(); + } + } + + public static class BytesOutputHandler implements Runnable, BytesProcessor { @@ -383,28 +430,28 @@ public class ConnectionHandler private BytesSource _bytesSource; private boolean _closed; private ConnectionEndpoint _conn; - private SocketExceptionHandler _exceptionHandler; - - public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler) - { - _outputStream = outputStream; - _bytesSource = source; - _conn = conn; - _exceptionHandler = exceptionHandler; - } + private ExceptionHandler _exceptionHandler; - public void run() - { + public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, ExceptionHandler exceptionHandler) + { + _outputStream = outputStream; + _bytesSource = source; + _conn = conn; + _exceptionHandler = exceptionHandler; + } - final BytesSource bytesSource = _bytesSource; + public void run() + { - while(!(_closed || bytesSource.closed())) - { - _bytesSource.getBytes(this, true); - } + final BytesSource bytesSource = _bytesSource; + while(!(_closed || bytesSource.closed())) + { + _bytesSource.getBytes(this, true); } + } + public void processBytes(final ByteBuffer buf) { try @@ -423,7 +470,7 @@ public class ConnectionHandler catch (IOException e) { _closed = true; - _exceptionHandler.processSocketException(e); + _exceptionHandler.handleException(e); } } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java index 540aee0f8d..3adf3c0b18 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java @@ -23,9 +23,9 @@ package org.apache.qpid.amqp_1_0.framing; /** * Callback interface for processing socket exceptions. */ -public interface SocketExceptionHandler +public interface ExceptionHandler { - public void processSocketException(Exception exception); + public void handleException(Exception exception); } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 34ca851978..c37c52c6ea 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -580,19 +580,7 @@ public class SessionEndpoint if(payload != null && payloadSent < payload.remaining()) { payload = payload.duplicate(); -try -{ payload.position(payload.position()+payloadSent); -} -catch(IllegalArgumentException e) -{ - System.err.println("UNEXPECTED"); - System.err.println("Payload Position: " + payload.position()); - System.err.println("Payload Sent: " + payloadSent); - System.err.println("Payload Remaining: " + payload.remaining()); - throw e; - -} Transfer secondTransfer = new Transfer(); |
