summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-17 21:33:03 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-17 21:33:03 +0000
commit2d7fa6fc88b7247e7e5c4218cd8aa0373aa80f13 (patch)
treeb15e853861db567b0fbddfd222e28d6128efb5a3 /java/client/src
parentddef18423e3b574c52a58b4b2233a3dfb59160a6 (diff)
downloadqpid-python-2d7fa6fc88b7247e7e5c4218cd8aa0373aa80f13.tar.gz
Solved remaining compile problems in client except for missing line in UnprocessedMessage.java file.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497179 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java71
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java1
4 files changed, 74 insertions, 43 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 55cdd39931..17fb7a0672 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -797,7 +797,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_inRecovery = inRecovery;
}
- public void acknowledge() throws JMSException
+ public void acknowledge() throws JMSException, AMQException
{
if (isClosed())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 1f038d05ec..fb50fc1ca9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -239,14 +239,21 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher is blocked
- //doing a put on the _synchronousQueue
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
- if (jmsMsg != null)
+ try
{
- preApplicationProcessing(jmsMsg);
- messageListener.onMessage(jmsMsg);
- postDeliver(jmsMsg);
+ //handle case where connection has already been started, and the dispatcher is blocked
+ //doing a put on the _synchronousQueue
+ AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+ if (jmsMsg != null)
+ {
+ preApplicationProcessing(jmsMsg);
+ messageListener.onMessage(jmsMsg);
+ postDeliver(jmsMsg);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
}
}
}
@@ -361,6 +368,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_logger.warn("Interrupted: " + e);
return null;
}
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
finally
{
releaseReceiving();
@@ -402,6 +413,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return m;
}
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
finally
{
releaseReceiving();
@@ -501,12 +516,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (_logger.isDebugEnabled())
{
- _logger.debug("notifyMessage called with message number " + messageFrame.content.getDestination());
+ _logger.debug("notifyMessage called with message number " + messageFrame.deliveryTag);
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.contentHeader.getDestination(),
- messageFrame.contentHeader.getr,
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliveryTag,
+ false,
messageFrame.contentHeader,
messageFrame.content);
@@ -541,7 +556,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private void preDeliver(AbstractJMSMessage msg)
+ private void preDeliver(AbstractJMSMessage msg) throws AMQException
{
switch (_acknowledgeMode)
{
@@ -556,7 +571,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private void postDeliver(AbstractJMSMessage msg) throws JMSException
+ private void postDeliver(AbstractJMSMessage msg) throws JMSException, AMQException
{
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
@@ -608,7 +623,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* Acknowledge up to last message delivered (if any). Used when commiting.
*/
- void acknowledgeLastDelivered()
+ void acknowledgeLastDelivered() throws AMQException
{
if (_lastDeliveryTag > 0)
{
@@ -666,7 +681,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- public void acknowledge() throws JMSException
+ public void acknowledge() throws JMSException, AMQException
{
if(!isClosed())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 391ce87ae4..9e2beaa964 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -106,6 +106,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+ throws AMQException
{
_connection = connection;
_destination = destination;
@@ -131,7 +132,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
- private void declareDestination(AMQDestination destination)
+ private void declareDestination(AMQDestination destination) throws AMQException
{
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
@@ -344,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
public void send(Destination destination, Message message, int deliveryMode,
int priority, long timeToLive, boolean mandatory,
boolean immediate, boolean waitUntilSent)
- throws JMSException
+ throws JMSException, AMQException
{
checkPreConditions();
checkDestination(destination);
@@ -494,7 +495,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
}
- declareDestination((AMQDestination) destination);
+ try
+ {
+ declareDestination((AMQDestination) destination);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
}
protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
@@ -560,32 +568,39 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
for (int i = 0; i < content.length; i++)
{
- AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- content[i], // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)deliveryMode, // short deliveryMode
- messageHeaders.getDestination(), // String destination
- destination.getExchangeName(), // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- messageHeaders.getMessageId(), // String messageId
- (short)priority, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- destination.getRoutingKey(), // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- timeToLive, // long ttl
- messageHeaders.getUserId()); // String userId
+ try
+ {
+ AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+ (byte)0, (byte)9, // AMQP version (major, minor)
+ messageHeaders.getAppId(), // String appId
+ messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+ content[i], // Content body
+ messageHeaders.getEncoding(), // String contentEncoding
+ messageHeaders.getContentType(), // String contentType
+ messageHeaders.getCorrelationId(), // String correlationId
+ (short)deliveryMode, // short deliveryMode
+ messageHeaders.getDestination(), // String destination
+ destination.getExchangeName(), // String exchange
+ messageHeaders.getExpiration(), // long expiration
+ immediate, // boolean immediate
+ messageHeaders.getMessageId(), // String messageId
+ (short)priority, // short priority
+ message.getJMSRedelivered(), // boolean redelivered
+ messageHeaders.getReplyTo(), // String replyTo
+ destination.getRoutingKey(), // String routingKey
+ new String("abc123").getBytes(), // byte[] securityToken
+ 0, // int ticket
+ messageHeaders.getTimestamp(), // long timestamp
+ messageHeaders.getTransactionId(), // String transactionId
+ timeToLive, // long ttl
+ messageHeaders.getUserId()); // String userId
- _protocolHandler.writeRequest(_channelId, methodBody);
+ _protocolHandler.writeRequest(_channelId, methodBody);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
index 71252dca8c..055fe6e940 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -52,6 +52,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener
MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
msg.content = transferBody.getBody();
msg.channelId = evt.getChannelId();
+ msg.deliveryTag = evt.getRequestId();
_logger.debug("New JmsDeliver method received");
MessageHeaders messageHeaders = new MessageHeaders();