From c86c9dc322e71901e88295210b371425204e918b Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 15 Feb 2007 23:16:38 +0000 Subject: added support for reference case git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508233 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/BasicMessageConsumer.java | 2 +- .../qpid/client/handler/MessageAppendMethodHandler.java | 1 + .../qpid/client/handler/MessageCloseMethodHandler.java | 2 ++ .../qpid/client/handler/MessageOpenMethodHandler.java | 7 ++++++- .../client/handler/MessageTransferMethodHandler.java | 2 +- .../apache/qpid/client/message/UnprocessedMessage.java | 10 +++++++++- .../apache/qpid/client/protocol/AMQProtocolSession.java | 10 ++++++++-- .../qpid/test/unit/basic/PubSubTwoConnectionTest.java | 17 +++++++++++++++-- 8 files changed, 43 insertions(+), 8 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 63d1746c8d..5667b6880c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -328,7 +328,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } public int getPrefetchHigh() - { + { return _prefetchHigh; } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java index 361b63230f..4178f48cf5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java @@ -47,6 +47,7 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener try { protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod()); + System.out.println("Message.appened()-->Appending message content to body"); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java index dfe15d3360..284b7444d2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java @@ -46,6 +46,8 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener { MessageCloseBody body = (MessageCloseBody)evt.getMethod(); String referenceId = new String(body.getReference()); + System.out.println("Message.closing()-->Handing message to session"); + protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); _logger.debug("Method Close Body received, notify session to accept unprocessed message"); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java index 21bcee9066..06d9c9ff99 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; @@ -44,7 +45,11 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { - // TODO + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), null, false); + String referenceId = new String(((MessageOpenBody)evt.getMethod()).getReference()); + protocolSession.unprocessedMessageReceived(referenceId, msg); + + System.out.println("Message.open()-->Adding message to map with ref"); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index 49c1184119..173b79a320 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener else { String referenceId = new String(transferBody.getBody().getContentAsByteArray()); - protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); + protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index b3ea03efe3..f242812d3b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -56,7 +56,7 @@ public class UnprocessedMessage this.redeliveredFlag = redeliveredFlag; addContent(content); } - + public void addContent(byte[] content) { contents.add(content); @@ -97,4 +97,12 @@ public class UnprocessedMessage { return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + new String(contents.get(0)); } + + public void setMessageHeaders(MessageHeaders messageHeaders) { + this.messageHeaders = messageHeaders; + } + + public void setRedeliveredFlag(boolean redeliveredFlag) { + this.redeliveredFlag = redeliveredFlag; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 77ecd1f4c0..06e378550e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -36,7 +36,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.message.MessageHeaders; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQRequestBody; @@ -52,9 +54,7 @@ import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; /** * Wrapper for protocol session that provides type-safe access to session attributes. @@ -282,6 +282,12 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP msg.addContent(appendBody.bytes); } + public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){ + UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId); + msg.setMessageHeaders(messageHeaders); + msg.setRedeliveredFlag(redilivered); + } + public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception { if (_logger.isDebugEnabled()) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index 937944e340..5c7f249107 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -68,8 +68,21 @@ public class PubSubTwoConnectionTest extends TestCase public static void main(String[] args){ PubSubTwoConnectionTest test = new PubSubTwoConnectionTest(); try { - test.setUp(); - test.testTwoConnections(); + //test.setUp(); + //test.testTwoConnections(); + int a = 5; + + System.out.println(a++); + System.out.println(a); + System.out.println(++a); + + int b = ++a; + int c = a++; + + System.out.println(b); + System.out.println(c); + System.out.println(a); + } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); -- cgit v1.2.1