summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-16 16:02:26 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-16 16:02:26 +0000
commit72de13352dc9c42acfe95a1d76f049c507eb5cfd (patch)
tree7d0a55ab52398bb6de7139cf77ea085b8f6f3edc /java/client
parent40eba25f51c024d7e10cf1b5e1d9f0110feddcdf (diff)
downloadqpid-python-72de13352dc9c42acfe95a1d76f049c507eb5cfd.tar.gz
Additions to allow refs to be sent from broker to client. Also some tidy-up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508460 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java17
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java36
5 files changed, 57 insertions, 30 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
index 4178f48cf5..235ddaacb9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
@@ -24,7 +24,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.log4j.Logger;
@@ -47,7 +49,12 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener
try
{
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
- System.out.println("Message.appened()-->Appending message content to body");
+
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getProtocolMajorVersion(), // AMQP major version
+ protocolSession.getProtocolMinorVersion()); // AMQP minor version
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
index 284b7444d2..62fde4d806 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
@@ -24,7 +24,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.log4j.Logger;
@@ -46,10 +48,15 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener
{
MessageCloseBody body = (MessageCloseBody)evt.getMethod();
String referenceId = new String(body.getReference());
- System.out.println("Message.closing()-->Handing message to session");
protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
_logger.debug("Method Close Body received, notify session to accept unprocessed message");
+
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getProtocolMajorVersion(), // AMQP major version
+ protocolSession.getProtocolMinorVersion()); // AMQP minor version
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
index 06d9c9ff99..8de85accdf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
@@ -25,7 +25,9 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
//import org.apache.log4j.Logger;
@@ -45,11 +47,15 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener
public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), null, false);
- String referenceId = new String(((MessageOpenBody)evt.getMethod()).getReference());
- protocolSession.unprocessedMessageReceived(referenceId, msg);
-
- System.out.println("Message.open()-->Adding message to map with ref");
+ byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference();
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId);
+ protocolSession.unprocessedMessageReceived(new String(referenceId), msg);
+
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+ protocolSession.getProtocolMajorVersion(), // AMQP major version
+ protocolSession.getProtocolMinorVersion()); // AMQP minor version
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index f242812d3b..3e5efd9068 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -35,11 +35,19 @@ public class UnprocessedMessage
{
private int bytesReceived = 0;
private int channelId;
+ private byte[] referenceId;
private List<byte[]> contents = new LinkedList();
private long deliveryTag;
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
+ public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.referenceId = referenceId;
+ }
+
public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
{
this.channelId = channelId;
@@ -73,6 +81,11 @@ public class UnprocessedMessage
return channelId;
}
+ public byte[] getReferenceId()
+ {
+ return referenceId;
+ }
+
public List<byte[]> getContents()
{
return contents;
@@ -95,7 +108,9 @@ public class UnprocessedMessage
public String toString()
{
- return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + new String(contents.get(0));
+ return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" +
+ deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" +
+ new String(contents.get(0));
}
public void setMessageHeaders(MessageHeaders messageHeaders) {
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
index 5c7f249107..27a312f3df 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -65,27 +65,19 @@ public class PubSubTwoConnectionTest extends TestCase
assertEquals("Hello", tm1.getText());
}
- public static void main(String[] args){
- PubSubTwoConnectionTest test = new PubSubTwoConnectionTest();
- try {
- //test.setUp();
- //test.testTwoConnections();
- int a = 5;
-
- System.out.println(a++);
- System.out.println(a);
- System.out.println(++a);
-
- int b = ++a;
- int c = a++;
-
- System.out.println(b);
- System.out.println(c);
- System.out.println(a);
-
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ public static void main(String[] args)
+ {
+ PubSubTwoConnectionTest test = new PubSubTwoConnectionTest();
+ try
+ {
+ test.setUp();
+ test.testTwoConnections();
+ test.tearDown();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
}