From 2d7fa6fc88b7247e7e5c4218cd8aa0373aa80f13 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Wed, 17 Jan 2007 21:33:03 +0000 Subject: Solved remaining compile problems in client except for missing line in UnprocessedMessage.java file. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497179 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 2 +- .../apache/qpid/client/BasicMessageConsumer.java | 43 ++++++++----- .../apache/qpid/client/BasicMessageProducer.java | 71 +++++++++++++--------- .../handler/MessageTransferMethodHandler.java | 1 + 4 files changed, 74 insertions(+), 43 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 55cdd39931..17fb7a0672 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -797,7 +797,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _inRecovery = inRecovery; } - public void acknowledge() throws JMSException + public void acknowledge() throws JMSException, AMQException { if (isClosed()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1f038d05ec..fb50fc1ca9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -239,14 +239,21 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (messageListener != null) { - //handle case where connection has already been started, and the dispatcher is blocked - //doing a put on the _synchronousQueue - AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); - if (jmsMsg != null) + try { - preApplicationProcessing(jmsMsg); - messageListener.onMessage(jmsMsg); - postDeliver(jmsMsg); + //handle case where connection has already been started, and the dispatcher is blocked + //doing a put on the _synchronousQueue + AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); + if (jmsMsg != null) + { + preApplicationProcessing(jmsMsg); + messageListener.onMessage(jmsMsg); + postDeliver(jmsMsg); + } + } + catch (AMQException e) + { + throw new JMSException(e.toString()); } } } @@ -361,6 +368,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _logger.warn("Interrupted: " + e); return null; } + catch (AMQException e) + { + throw new JMSException(e.toString()); + } finally { releaseReceiving(); @@ -402,6 +413,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return m; } + catch (AMQException e) + { + throw new JMSException(e.toString()); + } finally { releaseReceiving(); @@ -501,12 +516,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (_logger.isDebugEnabled()) { - _logger.debug("notifyMessage called with message number " + messageFrame.content.getDestination()); + _logger.debug("notifyMessage called with message number " + messageFrame.deliveryTag); } try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.contentHeader.getDestination(), - messageFrame.contentHeader.getr, + AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliveryTag, + false, messageFrame.contentHeader, messageFrame.content); @@ -541,7 +556,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private void preDeliver(AbstractJMSMessage msg) + private void preDeliver(AbstractJMSMessage msg) throws AMQException { switch (_acknowledgeMode) { @@ -556,7 +571,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private void postDeliver(AbstractJMSMessage msg) throws JMSException + private void postDeliver(AbstractJMSMessage msg) throws JMSException, AMQException { msg.setJMSDestination(_destination); switch (_acknowledgeMode) @@ -608,7 +623,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** * Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeLastDelivered() throws AMQException { if (_lastDeliveryTag > 0) { @@ -666,7 +681,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() throws JMSException + public void acknowledge() throws JMSException, AMQException { if(!isClosed()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 391ce87ae4..9e2beaa964 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -106,6 +106,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent) + throws AMQException { _connection = connection; _destination = destination; @@ -131,7 +132,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - private void declareDestination(AMQDestination destination) + private void declareDestination(AMQDestination destination) throws AMQException { // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false @@ -344,7 +345,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate, boolean waitUntilSent) - throws JMSException + throws JMSException, AMQException { checkPreConditions(); checkDestination(destination); @@ -494,7 +495,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j throw new JMSException("Unsupported destination class: " + (destination != null ? destination.getClass() : null)); } - declareDestination((AMQDestination) destination); + try + { + declareDestination((AMQDestination) destination); + } + catch (AMQException e) + { + throw new JMSException(e.toString()); + } } protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, @@ -560,32 +568,39 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } for (int i = 0; i < content.length; i++) { - AMQMethodBody methodBody = MessageTransferBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - messageHeaders.getAppId(), // String appId - messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - content[i], // Content body - messageHeaders.getEncoding(), // String contentEncoding - messageHeaders.getContentType(), // String contentType - messageHeaders.getCorrelationId(), // String correlationId - (short)deliveryMode, // short deliveryMode - messageHeaders.getDestination(), // String destination - destination.getExchangeName(), // String exchange - messageHeaders.getExpiration(), // long expiration - immediate, // boolean immediate - messageHeaders.getMessageId(), // String messageId - (short)priority, // short priority - false, // boolean redelivered - messageHeaders.getReplyTo(), // String replyTo - destination.getRoutingKey(), // String routingKey - new String("abc123").getBytes(), // byte[] securityToken - 0, // int ticket - messageHeaders.getTimestamp(), // long timestamp - messageHeaders.getTransactionId(), // String transactionId - timeToLive, // long ttl - messageHeaders.getUserId()); // String userId + try + { + AMQMethodBody methodBody = MessageTransferBody.createMethodBody( + (byte)0, (byte)9, // AMQP version (major, minor) + messageHeaders.getAppId(), // String appId + messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders + content[i], // Content body + messageHeaders.getEncoding(), // String contentEncoding + messageHeaders.getContentType(), // String contentType + messageHeaders.getCorrelationId(), // String correlationId + (short)deliveryMode, // short deliveryMode + messageHeaders.getDestination(), // String destination + destination.getExchangeName(), // String exchange + messageHeaders.getExpiration(), // long expiration + immediate, // boolean immediate + messageHeaders.getMessageId(), // String messageId + (short)priority, // short priority + message.getJMSRedelivered(), // boolean redelivered + messageHeaders.getReplyTo(), // String replyTo + destination.getRoutingKey(), // String routingKey + new String("abc123").getBytes(), // byte[] securityToken + 0, // int ticket + messageHeaders.getTimestamp(), // long timestamp + messageHeaders.getTransactionId(), // String transactionId + timeToLive, // long ttl + messageHeaders.getUserId()); // String userId - _protocolHandler.writeRequest(_channelId, methodBody); + _protocolHandler.writeRequest(_channelId, methodBody); + } + catch (AMQException e) + { + throw new JMSException(e.toString()); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java index 71252dca8c..055fe6e940 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java @@ -52,6 +52,7 @@ public class MessageTransferMethodHandler implements StateAwareMethodListener MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod(); msg.content = transferBody.getBody(); msg.channelId = evt.getChannelId(); + msg.deliveryTag = evt.getRequestId(); _logger.debug("New JmsDeliver method received"); MessageHeaders messageHeaders = new MessageHeaders(); -- cgit v1.2.1