summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-20 20:22:14 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-20 20:22:14 +0000
commitbae5b4dac83c2cc28badf10f2fde659066ec27fe (patch)
tree3b67473acaee2cd4c7348d9a7453320b64fff9cf /java/client/src/main
parent06a8d7b5dbbacf9eaff5bb1a788aa48a06df8b8e (diff)
downloadqpid-python-bae5b4dac83c2cc28badf10f2fde659066ec27fe.tar.gz
Fixed the various Ref modes so that the new MessageRefTest passes all tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java74
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java4
11 files changed, 134 insertions, 51 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index f20a3f7d1f..7d1615bc0c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -177,10 +177,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(String broker, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
+ this(broker, username, password, clientName, virtualHost, null);
+ }
+ public AMQConnection(String broker, String username, String password,
+ String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+ {
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
(clientName == null ? "" : clientName) + "/" +
- virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
+ virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"), params);
}
public AMQConnection(String host, int port, String username, String password,
@@ -192,6 +197,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(String host, int port, boolean useSSL, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
+ this(host, port, useSSL, username, password, clientName, virtualHost, null);
+ }
+
+ public AMQConnection(String host, int port, boolean useSSL, String username, String password,
+ String clientName, String virtualHost, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+ {
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
@@ -203,16 +214,25 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
(clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
- ));
+ ), params);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection));
+ this(new AMQConnectionURL(connection), null);
+ }
+
+ public AMQConnection(String connection, ConnectionTuneParameters params) throws AMQException, URLSyntaxException
+ {
+ this(new AMQConnectionURL(connection), params);
}
public AMQConnection(ConnectionURL connectionURL) throws AMQException
{
+ this(connectionURL, null);
+ }
+ public AMQConnection(ConnectionURL connectionURL, ConnectionTuneParameters params) throws AMQException
+ {
_logger.info("Connection:" + connectionURL);
_ConnectionId.incrementAndGet();
if (connectionURL == null)
@@ -229,7 +249,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_failoverPolicy = new FailoverPolicy(connectionURL);
- _protocolHandler = new AMQProtocolHandler(this);
+ _protocolHandler = new AMQProtocolHandler(this, params);
// We are not currently connected
_connected = false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9027c1b29c..f4d588ca9b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1645,10 +1645,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_startedAtLeastOnce.getAndSet(true))
{
- try{
+ try
+ {
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
- }catch(AMQException e){
+ }
+ catch(AMQException e)
+ {
_logger.error("Error Un Suspending Channel", e);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 7063ad62d1..50a2f5af99 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -36,6 +36,7 @@ import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -106,6 +107,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private final boolean _waitUntilSent;
private static final Content[] NO_CONTENT = new Content[0];
+
+ private static AtomicLong _refIdCounter;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -126,6 +129,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_immediate = immediate;
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
+ _refIdCounter = new AtomicLong();
}
void resubscribe() throws AMQException
@@ -256,19 +260,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
- public void sendRef(Message message) throws JMSException
- {
- checkPreConditions();
- checkInitialDestination();
-
-
- synchronized (_connection.getFailoverMutex())
- {
- sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
- }
- }
-
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
@@ -373,6 +364,44 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
+ // Send entire message as a ref
+ public void sendAsRef(Message message) throws JMSException
+ {
+ checkPreConditions();
+ checkInitialDestination();
+
+
+ synchronized (_connection.getFailoverMutex())
+ {
+ sendImpl(_destination, message, true, _deliveryMode, _messagePriority, _timeToLive,
+ _mandatory, _immediate);
+ }
+ }
+
+ // Test methods for sending a ref
+ public String openRef() throws JMSException
+ {
+ String referenceId = generateReferenceId();
+ doMessageOpen(referenceId);
+ return referenceId;
+ }
+
+ public void transferRef(String referenceId, MessageHeaders messageHeaders) throws JMSException
+ {
+ Content content = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
+ doMessageTransfer(messageHeaders, _destination, content, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate, false);
+ }
+
+ public void appendRef(String referenceId, byte[] content) throws JMSException
+ {
+ doMessageAppend(referenceId, content);
+ }
+
+ public void closeRef(String referenceId) throws JMSException
+ {
+ doMessageClose(referenceId);
+ }
+
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
@@ -526,7 +555,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
Content data = new Content(Content.TypeEnum.INLINE_T, payload);
- doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate);
+ doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered());
}
else
{
@@ -547,8 +576,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
doMessageOpen(referenceId);
// Message.Transfer
- Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
- doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,mandatory,immediate);
+ Content data = new Content(Content.TypeEnum.REF_T, referenceId.getBytes());
+ doMessageTransfer(messageHeaders, destination, data, deliveryMode, priority, timeToLive, mandatory, immediate, message.getJMSRedelivered());
//Message.Append
for(Iterator it = content.iterator(); it.hasNext();)
@@ -572,8 +601,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
private void doMessageTransfer(MessageHeaders messageHeaders, AMQDestination destination, Content content,
- AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
+ boolean redelivered) throws JMSException
{
try
{
@@ -583,7 +612,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
_protocolHandler.getProtocolMinorVersion(), // AMQP minor version
messageHeaders.getAppId(), // String appId
messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- content, // Content body
+ content, // Content body
messageHeaders.getEncoding(), // String contentEncoding
messageHeaders.getContentType(), // String contentType
messageHeaders.getCorrelationId(), // String correlationId
@@ -595,7 +624,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
mandatory, // boolean mandatory
messageHeaders.getMessageId(), // String messageId
(short)priority, // short priority
- message.getJMSRedelivered(), // boolean redelivered
+ redelivered, // boolean redelivered
messageHeaders.getReplyTo(), // String replyTo
destination.getRoutingKey(), // String routingKey
new String("abc123").getBytes(), // byte[] securityToken
@@ -665,8 +694,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
- private String generateReferenceId(){
- return String.valueOf(System.currentTimeMillis());
+ private String generateReferenceId()
+ {
+ return String.valueOf(_refIdCounter.incrementAndGet());
}
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index fd7c48b89f..da430cd002 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -59,9 +59,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
- params.setChannelMax(frame.channelMax);
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ // Set frame and channel max to smaller of client or broker size (if client size is set)
+ if (frame.getFrameMax() < params.getFrameMax() || params.getFrameMax() == 0)
+ {
+ params.setFrameMax(frame.getFrameMax());
+ }
+ if (frame.getChannelMax() < params.getChannelMax() || params.getChannelMax() == 0)
+ {
+ params.setChannelMax(frame.getChannelMax());
+ }
+ // Set heartbeat delay to lowest value
+ if (Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat) < params.getHeartbeat() || params.getHeartbeat() == 0)
+ {
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ }
protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
index 235ddaacb9..508c25904d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
@@ -50,11 +50,13 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener
{
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getProtocolMajorVersion(), // AMQP major version
- protocolSession.getProtocolMinorVersion()); // AMQP minor version
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+// final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+// protocolSession.getProtocolMajorVersion(), // AMQP major version
+// protocolSession.getProtocolMinorVersion()); // AMQP minor version
+// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
catch (Exception e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
index 62fde4d806..de089bde2f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
@@ -52,11 +52,13 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener
protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId);
_logger.debug("Method Close Body received, notify session to accept unprocessed message");
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getProtocolMajorVersion(), // AMQP major version
- protocolSession.getProtocolMinorVersion()); // AMQP minor version
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+// final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+// protocolSession.getProtocolMajorVersion(), // AMQP major version
+// protocolSession.getProtocolMinorVersion()); // AMQP minor version
+// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
index 8de85accdf..266c8a8f13 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
@@ -48,14 +48,16 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener
public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference();
- final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId);
+ final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), referenceId);
protocolSession.unprocessedMessageReceived(new String(referenceId), msg);
+// TODO: Fix this - the MethodOks are never being sent, find a way to send them when the JMS
+// Acknowledgement mode is appropriate.
// Be aware of possible changes to parameter order as versions change.
- final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getProtocolMajorVersion(), // AMQP major version
- protocolSession.getProtocolMinorVersion()); // AMQP minor version
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
+// final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
+// protocolSession.getProtocolMajorVersion(), // AMQP major version
+// protocolSession.getProtocolMinorVersion()); // AMQP minor version
+// protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
index 173b79a320..c274facad5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -78,7 +78,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener
else
{
String referenceId = new String(transferBody.getBody().getContentAsByteArray());
- protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, messageHeaders,transferBody.getRedelivered());
+ protocolSession.messageTransferBodyReceivedForReferenceCase(referenceId, evt.getRequestId(), messageHeaders, transferBody.getRedelivered());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 3e5efd9068..5dc63561e8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -41,10 +41,9 @@ public class UnprocessedMessage
private boolean redeliveredFlag;
private MessageHeaders messageHeaders;
- public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId)
+ public UnprocessedMessage(int channelId, byte[] referenceId)
{
this.channelId = channelId;
- this.deliveryTag = deliveryTag;
this.referenceId = referenceId;
}
@@ -113,11 +112,18 @@ public class UnprocessedMessage
new String(contents.get(0));
}
- public void setMessageHeaders(MessageHeaders messageHeaders) {
+ public void setDeliveryTag(long deliveryTag)
+ {
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void setMessageHeaders(MessageHeaders messageHeaders)
+ {
this.messageHeaders = messageHeaders;
}
- public void setRedeliveredFlag(boolean redeliveredFlag) {
+ public void setRedeliveredFlag(boolean redeliveredFlag)
+ {
this.redeliveredFlag = redeliveredFlag;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c38f925b1e..b4a15a70a7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
@@ -70,6 +71,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* mapping between connection instances and protocol handler instances.
*/
private AMQConnection _connection;
+ private ConnectionTuneParameters _params;
/**
* Used only when determining whether to add the SSL filter or not. This should be made more
@@ -104,9 +106,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
- public AMQProtocolHandler(AMQConnection con)
+ public AMQProtocolHandler(AMQConnection con, ConnectionTuneParameters params)
{
_connection = con;
+ _params = params;
}
public boolean isUseSSL()
@@ -156,6 +159,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+ if (_params != null)
+ _protocolSession.setConnectionTuneParameters(_params);
_protocolSession.init();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 06e378550e..b94647ea8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -282,8 +282,10 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP
msg.addContent(appendBody.bytes);
}
- public void messageTransferBodyReceivedForReferenceCase(String referenceId,MessageHeaders messageHeaders,boolean redilivered){
+ public void messageTransferBodyReceivedForReferenceCase(String referenceId, long deliveryTag, MessageHeaders messageHeaders, boolean redilivered)
+ {
UnprocessedMessage msg = (UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+ msg.setDeliveryTag(deliveryTag);
msg.setMessageHeaders(messageHeaders);
msg.setRedeliveredFlag(redilivered);
}