summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-09 16:53:51 +0000
commit30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (patch)
treef3d16257ed0a431f2f4c43166f4df84ccb877a6c /qpid/java/amqp-1-0-common
parentb165cf52a4ef16ac5a5ee181d4da2db351f7882d (diff)
downloadqpid-python-30d213dc1e6d743f2f0abb44c8bc91868d5126b1.tar.gz
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-common')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java15
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java81
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java (renamed from qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java)4
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java12
4 files changed, 67 insertions, 45 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
index b846b16722..b96e1ab47b 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
@@ -66,20 +66,7 @@ public class SymbolTypeConstructor extends VariableWidthTypeConstructor
if(symbolVal == null)
{
ByteBuffer dup = in.duplicate();
- try
- {
- dup.limit(in.position()+size);
- }
- catch (IllegalArgumentException e)
- {
- System.err.println("in.position(): " + in.position());
- System.err.println("size: " + size);
- System.err.println("dup.position(): " + dup.position());
- System.err.println("dup.capacity(): " + dup.capacity());
- System.err.println("dup.limit(): " + dup.limit());
- throw e;
-
- }
+ dup.limit(in.position()+size);
CharBuffer charBuf = ASCII.decode(dup);
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
index 119dd6bf3a..54a4f22d48 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
@@ -65,6 +65,12 @@ public class ConnectionHandler
public boolean parse(ByteBuffer in)
{
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ Binary b = new Binary(in.array(),in.arrayOffset()+in.position(),in.remaining());
+ RAW_LOGGER.fine("RECV [" + _connection.getRemoteAddress() + "] : " + b.toString());
+ }
+
while(in.hasRemaining() && !isDone())
{
_delegate = _delegate.parse(in);
@@ -376,6 +382,47 @@ public class ConnectionHandler
}
+ public static class SequentialFrameSource implements FrameSource
+ {
+ private Queue<FrameSource> _sources = new LinkedList<FrameSource>();
+
+ public SequentialFrameSource(FrameSource... sources)
+ {
+ _sources.addAll(Arrays.asList(sources));
+ }
+
+ public synchronized void addSource(FrameSource source)
+ {
+ _sources.add(source);
+ }
+
+ @Override
+ public synchronized AMQFrame getNextFrame(final boolean wait)
+ {
+ FrameSource src = _sources.peek();
+ while (src != null && src.closed())
+ {
+ _sources.poll();
+ src = _sources.peek();
+ }
+
+ if(src != null)
+ {
+ return src.getNextFrame(wait);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean closed()
+ {
+ return _sources.isEmpty();
+ }
+ }
+
+
public static class BytesOutputHandler implements Runnable, BytesProcessor
{
@@ -383,28 +430,28 @@ public class ConnectionHandler
private BytesSource _bytesSource;
private boolean _closed;
private ConnectionEndpoint _conn;
- private SocketExceptionHandler _exceptionHandler;
-
- public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
- {
- _outputStream = outputStream;
- _bytesSource = source;
- _conn = conn;
- _exceptionHandler = exceptionHandler;
- }
+ private ExceptionHandler _exceptionHandler;
- public void run()
- {
+ public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, ExceptionHandler exceptionHandler)
+ {
+ _outputStream = outputStream;
+ _bytesSource = source;
+ _conn = conn;
+ _exceptionHandler = exceptionHandler;
+ }
- final BytesSource bytesSource = _bytesSource;
+ public void run()
+ {
- while(!(_closed || bytesSource.closed()))
- {
- _bytesSource.getBytes(this, true);
- }
+ final BytesSource bytesSource = _bytesSource;
+ while(!(_closed || bytesSource.closed()))
+ {
+ _bytesSource.getBytes(this, true);
}
+ }
+
public void processBytes(final ByteBuffer buf)
{
try
@@ -423,7 +470,7 @@ public class ConnectionHandler
catch (IOException e)
{
_closed = true;
- _exceptionHandler.processSocketException(e);
+ _exceptionHandler.handleException(e);
}
}
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java
index 540aee0f8d..3adf3c0b18 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java
@@ -23,9 +23,9 @@ package org.apache.qpid.amqp_1_0.framing;
/**
* Callback interface for processing socket exceptions.
*/
-public interface SocketExceptionHandler
+public interface ExceptionHandler
{
- public void processSocketException(Exception exception);
+ public void handleException(Exception exception);
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 34ca851978..c37c52c6ea 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -580,19 +580,7 @@ public class SessionEndpoint
if(payload != null && payloadSent < payload.remaining())
{
payload = payload.duplicate();
-try
-{
payload.position(payload.position()+payloadSent);
-}
-catch(IllegalArgumentException e)
-{
- System.err.println("UNEXPECTED");
- System.err.println("Payload Position: " + payload.position());
- System.err.println("Payload Sent: " + payloadSent);
- System.err.println("Payload Remaining: " + payload.remaining());
- throw e;
-
-}
Transfer secondTransfer = new Transfer();