diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 20:22:14 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 20:22:14 +0000 |
| commit | bae5b4dac83c2cc28badf10f2fde659066ec27fe (patch) | |
| tree | 3b67473acaee2cd4c7348d9a7453320b64fff9cf /java | |
| parent | 06a8d7b5dbbacf9eaff5bb1a788aa48a06df8b8e (diff) | |
| download | qpid-python-bae5b4dac83c2cc28badf10f2fde659066ec27fe.tar.gz | |
Fixed the various Ref modes so that the new MessageRefTest passes all tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
14 files changed, 433 insertions, 144 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3ab20e74bf..3f0fb26a65 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -140,11 +140,11 @@ public class AMQChannel private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); private Set<Long> _browsedAcks = new HashSet<Long>(); - + /** * Used in creating unique references. */ - private byte _refCounter; + private static AtomicLong _refIdCounter = new AtomicLong(); // XXX: clean up arguments public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener) @@ -290,32 +290,44 @@ public class AMQChannel public void addMessageOpen(MessageOpenBody open) throws AMQException { - try { + try + { createReference(open.reference); - } catch (IllegalArgumentException e) { + } + catch (IllegalArgumentException e) + { throw open.getConnectionException(503, "Reference is already open"); } } public void addMessageAppend(MessageAppendBody append) throws AMQException { - try { + try + { AMQReference ref = getReference(append.reference); - ref.appendContent(ByteBuffer.wrap(append.bytes)); - } catch (IllegalArgumentException e) { + if (append.bytes != null) // sending an empty string results in a null + { + ref.appendContent(ByteBuffer.wrap(append.bytes)); + } + } + catch (IllegalArgumentException e) + { throw append.getConnectionException(503, "Reference is not open"); } } public void addMessageClose(MessageCloseBody close) throws AMQException { - try { + try + { AMQReference ref = removeReference(close.reference); for (AMQMessage msg : ref.getMessageList()) { routeCurrentMessage(msg); } - } catch (IllegalArgumentException e) { + } + catch (IllegalArgumentException e) + { throw close.getConnectionException(503, "Reference is not open"); } } @@ -392,8 +404,11 @@ public class AMQChannel _session.writeRequest(_channelId, mtb, listener); } - private synchronized byte[] nextRefId() { - return new byte[]{_refCounter++}; + private synchronized byte[] nextRefId() + { + // clumsy + return String.valueOf(_refIdCounter.incrementAndGet()).getBytes(); + //return new byte[]{_refIdCounter.getAndIncrement()}; } public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener) @@ -471,7 +486,7 @@ public class AMQChannel { throw new ConsumerTagNotUniqueException(); } - + acks = acks; queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); return tag; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f20a3f7d1f..7d1615bc0c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -177,10 +177,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { + this(broker, username, password, clientName, virtualHost, null); + } + public AMQConnection(String broker, String username, String password, + String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException + { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'")); + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), params); } public AMQConnection(String host, int port, String username, String password, @@ -192,6 +197,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { + this(host, port, useSSL, username, password, clientName, virtualHost, null); + } + + public AMQConnection(String host, int port, boolean useSSL, String username, String password, + String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException + { this(new AMQConnectionURL(useSSL ? ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + @@ -203,16 +214,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'" - )); + ), params); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection)); + this(new AMQConnectionURL(connection), null); + } + + public AMQConnection(String connection, ConnectionTuneParameters params) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(connection), params); } public AMQConnection(ConnectionURL connectionURL) throws AMQException { + this(connectionURL, null); + } + public AMQConnection(ConnectionURL connectionURL, ConnectionTuneParameters params) throws AMQException + { _logger.info("Connection:" + connectionURL); _ConnectionId.incrementAndGet(); if (connectionURL == null) @@ -229,7 +249,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _failoverPolicy = new FailoverPolicy(connectionURL); - _protocolHandler = new AMQProtocolHandler(this); + _protocolHandler = new AMQProtocolHandler(this, params); // We are not currently connected _connected = false; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9027c1b29c..f4d588ca9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1645,10 +1645,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_startedAtLeastOnce.getAndSet(true)) { - try{ + try + { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); - }catch(AMQException e){ + } + catch(AMQException e) + { _logger.error("Error Un Suspending Channel", e); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7063ad62d1..50a2f5af99 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -36,6 +36,7 @@ import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -106,6 +107,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _waitUntilSent; private static final Content[] NO_CONTENT = new Content[0]; + + private static AtomicLong _refIdCounter; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, @@ -126,6 +129,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _immediate = immediate; _mandatory = mandatory; _waitUntilSent = waitUntilSent; + _refIdCounter = new AtomicLong(); } void resubscribe() throws AMQException @@ -256,19 +260,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - public void sendRef(Message message) throws JMSException - { - checkPreConditions(); - checkInitialDestination(); - - - synchronized (_connection.getFailoverMutex()) - { - sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); - } - } - public void send(Message message, int deliveryMode) throws JMSException { checkPreConditions(); @@ -373,6 +364,44 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } + // Send entire message as a ref + public void sendAsRef(Message message) throws JMSException + { + checkPreConditions(); + checkInitialDestination(); + + + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive, + _mandatory, _immediate); + } + } + + // Test methods for sending a ref + public String openRef() throws JMSException + { + String referenceId = generateReferenceId(); + doMessageOpen(referenceId); + return referenceId; + } + + public void transferRef(String referenceId, MessageHeaders messageHeaders) throws JMSException + { + Content content = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); + doMessageTransfer(messageHeaders, _destination, content, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate, false); + } + + public void appendRef(String referenceId, byte[] content) throws JMSException + { + doMessageAppend(referenceId, content); + } + + public void closeRef(String referenceId) throws JMSException + { + doMessageClose(referenceId); + } + private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException { @@ -526,7 +555,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j Content data = new Content(Content.TypeEnum.INLINE_T, payload); - doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate); + doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered()); } else { @@ -547,8 +576,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j doMessageOpen(referenceId); // Message.Transfer - Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); - doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate); + Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); + doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered()); //Message.Append for(Iterator it = content.iterator(); it.hasNext();) @@ -572,8 +601,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } private void doMessageTransfer(MessageHeaders messageHeaders, AMQDestination destination, Content content, - AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, + boolean redelivered) throws JMSException { try { @@ -583,7 +612,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _protocolHandler.getProtocolMinorVersion(), // AMQP minor version messageHeaders.getAppId(), // String appId messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - content, // Content body + content, // Content body messageHeaders.getEncoding(), // String contentEncoding messageHeaders.getContentType(), // String contentType messageHeaders.getCorrelationId(), // String correlationId @@ -595,7 +624,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j mandatory, // boolean mandatory messageHeaders.getMessageId(), // String messageId (short)priority, // short priority - message.getJMSRedelivered(), // boolean redelivered + redelivered, // boolean redelivered messageHeaders.getReplyTo(), // String replyTo destination.getRoutingKey(), // String routingKey new String("abc123").getBytes(), // byte[] securityToken @@ -665,8 +694,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - private String generateReferenceId(){ - return String.valueOf(System.currentTimeMillis()); + private String generateReferenceId() + { + return String.valueOf(_refIdCounter.incrementAndGet()); } private void checkTemporaryDestination(AMQDestination destination) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index fd7c48b89f..da430cd002 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -59,9 +59,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener params = new ConnectionTuneParameters(); } - params.setFrameMax(frame.frameMax); - params.setChannelMax(frame.channelMax); - params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); + // Set frame and channel max to smaller of client or broker size (if client size is set) + if (frame.getFrameMax() < params.getFrameMax() || params.getFrameMax() == 0) + { + params.setFrameMax(frame.getFrameMax()); + } + if (frame.getChannelMax() < params.getChannelMax() || params.getChannelMax() == 0) + { + params.setChannelMax(frame.getChannelMax()); + } + // Set heartbeat delay to lowest value + if (Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat) < params.getHeartbeat() || params.getHeartbeat() == 0) + { + params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); + } protocolSession.setConnectionTuneParameters(params); stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java index 235ddaacb9..508c25904d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java @@ -50,11 +50,13 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener { protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod()); +// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS +// Acknowledgement mode is appropriate. // Be aware of possible changes to parameter order as versions change. - final AMQMethodBody methodBody = MessageOkBody.createMethodBody( - protocolSession.getProtocolMajorVersion(), // AMQP major version - protocolSession.getProtocolMinorVersion()); // AMQP minor version - protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); +// final AMQMethodBody methodBody = MessageOkBody.createMethodBody( +// protocolSession.getProtocolMajorVersion(), // AMQP major version +// protocolSession.getProtocolMinorVersion()); // AMQP minor version +// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java index 62fde4d806..de089bde2f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java @@ -52,11 +52,13 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); _logger.debug("Method Close Body received, notify session to accept unprocessed message"); +// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS +// Acknowledgement mode is appropriate. // Be aware of possible changes to parameter order as versions change. - final AMQMethodBody methodBody = MessageOkBody.createMethodBody( - protocolSession.getProtocolMajorVersion(), // AMQP major version - protocolSession.getProtocolMinorVersion()); // AMQP minor version - protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); +// final AMQMethodBody methodBody = MessageOkBody.createMethodBody( +// protocolSession.getProtocolMajorVersion(), // AMQP major version +// protocolSession.getProtocolMinorVersion()); // AMQP minor version +// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java index 8de85accdf..266c8a8f13 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java @@ -48,14 +48,16 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference(); - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), referenceId); protocolSession.unprocessedMessageReceived(new String(referenceId), msg); +// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS +// Acknowledgement mode is appropriate. // Be aware of possible changes to parameter order as versions change. - final AMQMethodBody methodBody = MessageOkBody.createMethodBody( - protocolSession.getProtocolMajorVersion(), // AMQP major version - protocolSession.getProtocolMinorVersion()); // AMQP minor version - protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); +// final AMQMethodBody methodBody = MessageOkBody.createMethodBody( +// protocolSession.getProtocolMajorVersion(), // AMQP major version +// protocolSession.getProtocolMinorVersion()); // AMQP minor version +// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index 173b79a320..c274facad5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener else { String referenceId = new String(transferBody.getBody().getContentAsByteArray()); - protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered()); + protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, evt.getRequestId(), messageHeaders, transferBody.getRedelivered()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 3e5efd9068..5dc63561e8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -41,10 +41,9 @@ public class UnprocessedMessage private boolean redeliveredFlag; private MessageHeaders messageHeaders; - public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId) + public UnprocessedMessage(int channelId, byte[] referenceId) { this.channelId = channelId; - this.deliveryTag = deliveryTag; this.referenceId = referenceId; } @@ -113,11 +112,18 @@ public class UnprocessedMessage new String(contents.get(0)); } - public void setMessageHeaders(MessageHeaders messageHeaders) { + public void setDeliveryTag(long deliveryTag) + { + this.deliveryTag = deliveryTag; + } + + public void setMessageHeaders(MessageHeaders messageHeaders) + { this.messageHeaders = messageHeaders; } - public void setRedeliveredFlag(boolean redeliveredFlag) { + public void setRedeliveredFlag(boolean redeliveredFlag) + { this.redeliveredFlag = redeliveredFlag; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c38f925b1e..b4a15a70a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; @@ -70,6 +71,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * mapping between connection instances and protocol handler instances. */ private AMQConnection _connection; + private ConnectionTuneParameters _params; /** * Used only when determining whether to add the SSL filter or not. This should be made more @@ -104,9 +106,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - public AMQProtocolHandler(AMQConnection con) + public AMQProtocolHandler(AMQConnection con, ConnectionTuneParameters params) { _connection = con; + _params = params; } public boolean isUseSSL() @@ -156,6 +159,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + if (_params != null) + _protocolSession.setConnectionTuneParameters(_params); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 06e378550e..b94647ea8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -282,8 +282,10 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP msg.addContent(appendBody.bytes); } - public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){ + public void messageTransferBodyReceivedForReferenceCase(String referenceId, long deliveryTag, MessageHeaders messageHeaders, boolean redilivered) + { UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId); + msg.setDeliveryTag(deliveryTag); msg.setMessageHeaders(messageHeaders); msg.setRedeliveredFlag(redilivered); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java new file mode 100644 index 0000000000..c3c6cf6c00 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MessageRefTest.java @@ -0,0 +1,272 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.message.JMSTextMessage; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class MessageRefTest extends TestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + public void testOneWayRef() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + producer.sendAsRef(session1.createTextMessage("Hello ref")); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("Hello ref", tm1.getText()); + + con2.close(); + con1.close(); + } + + public void testOneWayRefAppend() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + String refId = producer.openRef(); + producer.transferRef(refId, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders()); + producer.appendRef(refId, new String("ABC").getBytes()); + producer.appendRef(refId, new String("123").getBytes()); + producer.appendRef(refId, new String("").getBytes()); + producer.appendRef(refId, new String("DEF").getBytes()); + producer.appendRef(refId, new String("456").getBytes()); + producer.closeRef(refId); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("ABC123DEF456", tm1.getText()); + + con2.close(); + con1.close(); + } + + public void testTwoWayRef() throws Exception + { + // Set frame size to 1000 and send message of 2500 + ConnectionTuneParameters tp = new ConnectionTuneParameters(); + tp.setFrameMax(1000L); + tp.setChannelMax(32767); + tp.setHeartbeat(600); + String message = createMessage(2500); + + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + producer.send(session1.createTextMessage(message)); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals(message, tm1.getText()); + + con2.close(); + con1.close(); + } + + public void testUpSmallDownBig() throws Exception + { + ConnectionTuneParameters tp1 = new ConnectionTuneParameters(); + tp1.setFrameMax(1000L); + tp1.setChannelMax(32767); + tp1.setHeartbeat(600); + ConnectionTuneParameters tp2 = new ConnectionTuneParameters(); + tp2.setFrameMax(2000L); + tp2.setChannelMax(32767); + tp2.setHeartbeat(600); + String message = createMessage(2500); + + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp1); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp2); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + producer.send(session1.createTextMessage(message)); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals(message, tm1.getText()); + + con2.close(); + con1.close(); + } + + //*** Uncomment this test when the rechunking code has been included in AMQChannel.deliver() *** + /* public void testUpBigDownSmall() throws Exception + { + ConnectionTuneParameters tp1 = new ConnectionTuneParameters(); + tp1.setFrameMax(2000L); + tp1.setChannelMax(32767); + tp1.setHeartbeat(600); + ConnectionTuneParameters tp2 = new ConnectionTuneParameters(); + tp2.setFrameMax(1000L); + tp2.setChannelMax(32767); + tp2.setHeartbeat(600); + String message = createMessage(2500); + + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test", tp1); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp2); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + producer.send(session1.createTextMessage(message)); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals(message, tm1.getText()); + + con2.close(); + con1.close(); + } */ + + public void testInterleavedRefs() throws Exception + { + ConnectionTuneParameters tp = new ConnectionTuneParameters(); + tp.setFrameMax(1000L); + tp.setChannelMax(32767); + tp.setHeartbeat(600); + String message = createMessage(500); + + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test", tp); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + String refId1 = producer.openRef(); + String refId2 = producer.openRef(); + producer.transferRef(refId1, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders()); + producer.transferRef(refId2, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders()); + producer.appendRef(refId1, message.getBytes()); + producer.appendRef(refId2, message.getBytes()); + String refId3 = producer.openRef(); + producer.appendRef(refId1, message.getBytes()); + producer.transferRef(refId3, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders()); + producer.appendRef(refId3, message.getBytes()); + producer.appendRef(refId3, message.getBytes()); + producer.appendRef(refId1, message.getBytes()); + producer.closeRef(refId1); + producer.appendRef(refId3, message.getBytes()); + producer.appendRef(refId3, message.getBytes()); + producer.closeRef(refId2); + producer.appendRef(refId3, message.getBytes()); + producer.closeRef(refId3); + + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals(message + message + message, tm1.getText()); + TextMessage tm2 = (TextMessage) consumer.receive(2000); + assertNotNull(tm2); + assertEquals(message, tm2.getText()); + TextMessage tm3 = (TextMessage) consumer.receive(2000); + assertNotNull(tm3); + assertEquals(message + message + message + message + message, tm3.getText()); + + con2.close(); + con1.close(); + } + + public void testEmptyContentRef() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + AMQSession session1 = con1.createAMQSession(false, AMQSession.AUTO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); + Session session2 = con2.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + + String refId = producer.openRef(); + producer.transferRef(refId, ((JMSTextMessage)session1.createTextMessage()).getMessageHeaders()); + producer.closeRef(refId); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("", tm1.getText()); + + con2.close(); + con1.close(); + } + + // Utility to create message "012345678901234567890..." for length len chars. + private String createMessage(int len) + { + StringBuffer sb = new StringBuffer(len); + for (int i=0; i<len; i++) + sb.append(i%10); + return sb.toString(); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java deleted file mode 100644 index c54ea8c6e6..0000000000 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.basic; - -import junit.framework.TestCase; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.BasicMessageProducer; - -import javax.jms.*; - -/** - * @author Apache Software Foundation - */ -public class PubSubTwoConnectionRefTest extends TestCase -{ - protected void setUp() throws Exception - { - super.setUp(); - TransportConnection.createVMBroker(1); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - /** - * This tests that a consumer is set up synchronously - * @throws Exception - */ - public void testTwoConnections() throws Exception - { - AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); - AMQSession session1 = con1.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE); - BasicMessageProducer producer = session1.createBasicProducer(topic); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); - Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE); - MessageConsumer consumer = session2.createConsumer(topic); - con2.start(); - producer.sendRef(session1.createTextMessage("Hello ref")); -// producer.sendRef(session1.createTextMessage("Goodbye ref")); - TextMessage tm1 = (TextMessage) consumer.receive(2000); - assertNotNull(tm1); - assertEquals("Hello ref", tm1.getText()); -// assertEquals("Goodbye ref", tm1.getText()); - } - - public static void main(String[] args){ - PubSubTwoConnectionRefTest test = new PubSubTwoConnectionRefTest(); - try { - test.setUp(); - test.testTwoConnections(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } -} |
