summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-02-15 23:16:38 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-02-15 23:16:38 +0000
commitc86c9dc322e71901e88295210b371425204e918b (patch)
tree468cbeb90143f34f9be564928b8e16375c23b86d /java/client/src
parent8ad9152ef4db0a43b969023259c3cb2a7b9ea142 (diff)
downloadqpid-python-c86c9dc322e71901e88295210b371425204e918b.tar.gz
added support for reference case
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508233 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java17
8 files changed, 43 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 63d1746c8d..5667b6880c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -328,7 +328,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
public int getPrefetchHigh()
- {
+ {
return _prefetchHigh;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
index 361b63230f..4178f48cf5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
@@ -47,6 +47,7 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener
try
{
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+ System.out.println("Message.appened()-->Appending message content to body");
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
index dfe15d3360..284b7444d2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
@@ -46,6 +46,8 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener
{
MessageCloseBody body = (MessageCloseBody)evt.getMethod();
String referenceId = new String(body.getReference());
+ System.out.println("Message.closing()-->Handing message to session");
+
protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
_logger.debug("Method Close Body received, notify session to accept unprocessed message");
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
index 21bcee9066..06d9c9ff99 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -44,7 +45,11 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener
public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- // TODO
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), null, false);
+ String referenceId = new String(((MessageOpenBody)evt.getMethod()).getReference());
+ protocolSession.unprocessedMessageReceived(referenceId, msg);
+
+ System.out.println("Message.open()-->Adding message to map with ref");
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
index 49c1184119..173b79a320 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener
else
{
String referenceId = new String(transferBody.getBody().getContentAsByteArray());
- protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
+ protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index b3ea03efe3..f242812d3b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -56,7 +56,7 @@ public class UnprocessedMessage
this.redeliveredFlag = redeliveredFlag;
addContent(content);
}
-
+
public void addContent(byte[] content)
{
contents.add(content);
@@ -97,4 +97,12 @@ public class UnprocessedMessage
{
return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + new String(contents.get(0));
}
+
+ public void setMessageHeaders(MessageHeaders messageHeaders) {
+ this.messageHeaders = messageHeaders;
+ }
+
+ public void setRedeliveredFlag(boolean redeliveredFlag) {
+ this.redeliveredFlag = redeliveredFlag;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 77ecd1f4c0..06e378550e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -36,7 +36,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.MessageHeaders;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
@@ -52,9 +54,7 @@ import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
@@ -282,6 +282,12 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
msg.addContent(appendBody.bytes);
}
+ public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){
+ UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+ msg.setMessageHeaders(messageHeaders);
+ msg.setRedeliveredFlag(redilivered);
+ }
+
public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
{
if (_logger.isDebugEnabled())
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
index 937944e340..5c7f249107 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -68,8 +68,21 @@ public class PubSubTwoConnectionTest extends TestCase
public static void main(String[] args){
PubSubTwoConnectionTest test = new PubSubTwoConnectionTest();
try {
- test.setUp();
- test.testTwoConnections();
+ //test.setUp();
+ //test.testTwoConnections();
+ int a = 5;
+
+ System.out.println(a++);
+ System.out.println(a);
+ System.out.println(++a);
+
+ int b = ++a;
+ int c = a++;
+
+ System.out.println(b);
+ System.out.println(c);
+ System.out.println(a);
+
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();