diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 20:22:14 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 20:22:14 +0000 |
| commit | bae5b4dac83c2cc28badf10f2fde659066ec27fe (patch) | |
| tree | 3b67473acaee2cd4c7348d9a7453320b64fff9cf /java/client/src/main | |
| parent | 06a8d7b5dbbacf9eaff5bb1a788aa48a06df8b8e (diff) | |
| download | qpid-python-bae5b4dac83c2cc28badf10f2fde659066ec27fe.tar.gz | |
Fixed the various Ref modes so that the new MessageRefTest passes all tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
11 files changed, 134 insertions, 51 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index f20a3f7d1f..7d1615bc0c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -177,10 +177,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { + this(broker, username, password, clientName, virtualHost, null); + } + public AMQConnection(String broker, String username, String password, + String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException + { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + (clientName == null ? "" : clientName) + "/" + - virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'")); + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), params); } public AMQConnection(String host, int port, String username, String password, @@ -192,6 +197,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { + this(host, port, useSSL, username, password, clientName, virtualHost, null); + } + + public AMQConnection(String host, int port, boolean useSSL, String username, String password, + String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException + { this(new AMQConnectionURL(useSSL ? ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + @@ -203,16 +214,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'" - )); + ), params); } public AMQConnection(String connection) throws AMQException, URLSyntaxException { - this(new AMQConnectionURL(connection)); + this(new AMQConnectionURL(connection), null); + } + + public AMQConnection(String connection, ConnectionTuneParameters params) throws AMQException, URLSyntaxException + { + this(new AMQConnectionURL(connection), params); } public AMQConnection(ConnectionURL connectionURL) throws AMQException { + this(connectionURL, null); + } + public AMQConnection(ConnectionURL connectionURL, ConnectionTuneParameters params) throws AMQException + { _logger.info("Connection:" + connectionURL); _ConnectionId.incrementAndGet(); if (connectionURL == null) @@ -229,7 +249,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _failoverPolicy = new FailoverPolicy(connectionURL); - _protocolHandler = new AMQProtocolHandler(this); + _protocolHandler = new AMQProtocolHandler(this, params); // We are not currently connected _connected = false; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9027c1b29c..f4d588ca9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1645,10 +1645,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_startedAtLeastOnce.getAndSet(true)) { - try{ + try + { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); - }catch(AMQException e){ + } + catch(AMQException e) + { _logger.error("Error Un Suspending Channel", e); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7063ad62d1..50a2f5af99 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -36,6 +36,7 @@ import java.util.Enumeration; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -106,6 +107,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _waitUntilSent; private static final Content[] NO_CONTENT = new Content[0]; + + private static AtomicLong _refIdCounter; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, @@ -126,6 +129,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _immediate = immediate; _mandatory = mandatory; _waitUntilSent = waitUntilSent; + _refIdCounter = new AtomicLong(); } void resubscribe() throws AMQException @@ -256,19 +260,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - public void sendRef(Message message) throws JMSException - { - checkPreConditions(); - checkInitialDestination(); - - - synchronized (_connection.getFailoverMutex()) - { - sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive, - _mandatory, _immediate); - } - } - public void send(Message message, int deliveryMode) throws JMSException { checkPreConditions(); @@ -373,6 +364,44 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } + // Send entire message as a ref + public void sendAsRef(Message message) throws JMSException + { + checkPreConditions(); + checkInitialDestination(); + + + synchronized (_connection.getFailoverMutex()) + { + sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive, + _mandatory, _immediate); + } + } + + // Test methods for sending a ref + public String openRef() throws JMSException + { + String referenceId = generateReferenceId(); + doMessageOpen(referenceId); + return referenceId; + } + + public void transferRef(String referenceId, MessageHeaders messageHeaders) throws JMSException + { + Content content = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); + doMessageTransfer(messageHeaders, _destination, content, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate, false); + } + + public void appendRef(String referenceId, byte[] content) throws JMSException + { + doMessageAppend(referenceId, content); + } + + public void closeRef(String referenceId) throws JMSException + { + doMessageClose(referenceId); + } + private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException { @@ -526,7 +555,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j Content data = new Content(Content.TypeEnum.INLINE_T, payload); - doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate); + doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered()); } else { @@ -547,8 +576,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j doMessageOpen(referenceId); // Message.Transfer - Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); - doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate); + Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes()); + doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered()); //Message.Append for(Iterator it = content.iterator(); it.hasNext();) @@ -572,8 +601,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } private void doMessageTransfer(MessageHeaders messageHeaders, AMQDestination destination, Content content, - AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, + boolean redelivered) throws JMSException { try { @@ -583,7 +612,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j _protocolHandler.getProtocolMinorVersion(), // AMQP minor version messageHeaders.getAppId(), // String appId messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - content, // Content body + content, // Content body messageHeaders.getEncoding(), // String contentEncoding messageHeaders.getContentType(), // String contentType messageHeaders.getCorrelationId(), // String correlationId @@ -595,7 +624,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j mandatory, // boolean mandatory messageHeaders.getMessageId(), // String messageId (short)priority, // short priority - message.getJMSRedelivered(), // boolean redelivered + redelivered, // boolean redelivered messageHeaders.getReplyTo(), // String replyTo destination.getRoutingKey(), // String routingKey new String("abc123").getBytes(), // byte[] securityToken @@ -665,8 +694,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - private String generateReferenceId(){ - return String.valueOf(System.currentTimeMillis()); + private String generateReferenceId() + { + return String.valueOf(_refIdCounter.incrementAndGet()); } private void checkTemporaryDestination(AMQDestination destination) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index fd7c48b89f..da430cd002 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -59,9 +59,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener params = new ConnectionTuneParameters(); } - params.setFrameMax(frame.frameMax); - params.setChannelMax(frame.channelMax); - params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); + // Set frame and channel max to smaller of client or broker size (if client size is set) + if (frame.getFrameMax() < params.getFrameMax() || params.getFrameMax() == 0) + { + params.setFrameMax(frame.getFrameMax()); + } + if (frame.getChannelMax() < params.getChannelMax() || params.getChannelMax() == 0) + { + params.setChannelMax(frame.getChannelMax()); + } + // Set heartbeat delay to lowest value + if (Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat) < params.getHeartbeat() || params.getHeartbeat() == 0) + { + params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); + } protocolSession.setConnectionTuneParameters(params); stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java index 235ddaacb9..508c25904d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java @@ -50,11 +50,13 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener { protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod()); +// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS +// Acknowledgement mode is appropriate. // Be aware of possible changes to parameter order as versions change. - final AMQMethodBody methodBody = MessageOkBody.createMethodBody( - protocolSession.getProtocolMajorVersion(), // AMQP major version - protocolSession.getProtocolMinorVersion()); // AMQP minor version - protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); +// final AMQMethodBody methodBody = MessageOkBody.createMethodBody( +// protocolSession.getProtocolMajorVersion(), // AMQP major version +// protocolSession.getProtocolMinorVersion()); // AMQP minor version +// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java index 62fde4d806..de089bde2f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java @@ -52,11 +52,13 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); _logger.debug("Method Close Body received, notify session to accept unprocessed message"); +// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS +// Acknowledgement mode is appropriate. // Be aware of possible changes to parameter order as versions change. - final AMQMethodBody methodBody = MessageOkBody.createMethodBody( - protocolSession.getProtocolMajorVersion(), // AMQP major version - protocolSession.getProtocolMinorVersion()); // AMQP minor version - protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); +// final AMQMethodBody methodBody = MessageOkBody.createMethodBody( +// protocolSession.getProtocolMajorVersion(), // AMQP major version +// protocolSession.getProtocolMinorVersion()); // AMQP minor version +// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java index 8de85accdf..266c8a8f13 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java @@ -48,14 +48,16 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference(); - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), referenceId); protocolSession.unprocessedMessageReceived(new String(referenceId), msg); +// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS +// Acknowledgement mode is appropriate. // Be aware of possible changes to parameter order as versions change. - final AMQMethodBody methodBody = MessageOkBody.createMethodBody( - protocolSession.getProtocolMajorVersion(), // AMQP major version - protocolSession.getProtocolMinorVersion()); // AMQP minor version - protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); +// final AMQMethodBody methodBody = MessageOkBody.createMethodBody( +// protocolSession.getProtocolMajorVersion(), // AMQP major version +// protocolSession.getProtocolMinorVersion()); // AMQP minor version +// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index 173b79a320..c274facad5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener else { String referenceId = new String(transferBody.getBody().getContentAsByteArray()); - protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered()); + protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, evt.getRequestId(), messageHeaders, transferBody.getRedelivered()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 3e5efd9068..5dc63561e8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -41,10 +41,9 @@ public class UnprocessedMessage private boolean redeliveredFlag; private MessageHeaders messageHeaders; - public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId) + public UnprocessedMessage(int channelId, byte[] referenceId) { this.channelId = channelId; - this.deliveryTag = deliveryTag; this.referenceId = referenceId; } @@ -113,11 +112,18 @@ public class UnprocessedMessage new String(contents.get(0)); } - public void setMessageHeaders(MessageHeaders messageHeaders) { + public void setDeliveryTag(long deliveryTag) + { + this.deliveryTag = deliveryTag; + } + + public void setMessageHeaders(MessageHeaders messageHeaders) + { this.messageHeaders = messageHeaders; } - public void setRedeliveredFlag(boolean redeliveredFlag) { + public void setRedeliveredFlag(boolean redeliveredFlag) + { this.redeliveredFlag = redeliveredFlag; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c38f925b1e..b4a15a70a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; @@ -70,6 +71,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * mapping between connection instances and protocol handler instances. */ private AMQConnection _connection; + private ConnectionTuneParameters _params; /** * Used only when determining whether to add the SSL filter or not. This should be made more @@ -104,9 +106,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - public AMQProtocolHandler(AMQConnection con) + public AMQProtocolHandler(AMQConnection con, ConnectionTuneParameters params) { _connection = con; + _params = params; } public boolean isUseSSL() @@ -156,6 +159,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + if (_params != null) + _protocolSession.setConnectionTuneParameters(_params); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 06e378550e..b94647ea8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -282,8 +282,10 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP msg.addContent(appendBody.bytes); } - public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){ + public void messageTransferBodyReceivedForReferenceCase(String referenceId, long deliveryTag, MessageHeaders messageHeaders, boolean redilivered) + { UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId); + msg.setDeliveryTag(deliveryTag); msg.setMessageHeaders(messageHeaders); msg.setRedeliveredFlag(redilivered); } |
