From 1b9c3bf37aca0e0d5114c2863b4f2b8070e42569 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 30 Apr 2007 14:37:23 +0000 Subject: QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client. Updated to allow the use of durable subscriptions but it will not be as clean as with the extensions. Selectors are also now disabled. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@533764 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 86 ++++++- .../org/apache/qpid/client/QueueSenderAdapter.java | 286 ++++++++++++--------- 2 files changed, 232 insertions(+), 140 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ceca43b785..10101976eb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -209,6 +209,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final boolean _strictAMQP; + /** System property to enable strickt AMQP compliance */ + public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; + /** Strickt AMQP default */ + public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; + + private final boolean _strictAMQPFATAL; /** System property to enable immediate message prefetching */ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; @@ -436,6 +442,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); + _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); _connection = con; @@ -924,7 +931,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue - _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); + _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else { @@ -1212,13 +1219,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final int prefetchLow, final boolean noLocal, final boolean exclusive, - final String selector, + String selector, final FieldTable rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); + final String messageSelector; + + if (_strictAMQP && !(selector == null || selector.equals(""))) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); + } + else + { + messageSelector = null; + } + } + else + { + messageSelector = selector; + } return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() { @@ -1229,6 +1253,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQDestination amqd = (AMQDestination) destination; final AMQProtocolHandler protocolHandler = getProtocolHandler(); + // TODO: Define selectors in AMQP // TODO: construct the rawSelector from the selector string if rawSelector == null final FieldTable ft = FieldTableFactory.newFieldTable(); //if (rawSelector != null) @@ -1237,7 +1262,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { ft.addAll(rawSelector); } - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); @@ -1630,6 +1656,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + + checkNotClosed(); AMQTopic origTopic = checkValidTopic(topic); AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); @@ -1657,13 +1685,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { topicName = new AMQShortString(topic.getTopicName()); } - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + + if (_strictAMQP) { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + deleteQueue(dest.getAMQQueueName()); } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } } subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); @@ -1761,13 +1807,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) + if (_strictAMQP) { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + + " Requesting queue deletion regardless."); + } + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else { - throw new InvalidDestinationException("Unknown subscription exchange:" + name); + + if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) + { + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + } + else + { + throw new InvalidDestinationException("Unknown subscription exchange:" + name); + } } } } @@ -1779,10 +1843,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { - if (isStrictAMQP()) - { - throw new UnsupportedOperationException(); - } // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index e0c4b61333..a219f7d655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -9,119 +9,141 @@ import javax.jms.Queue; import javax.jms.QueueSender; import javax.jms.InvalidDestinationException; -public class QueueSenderAdapter implements QueueSender { - - private BasicMessageProducer _delegate; - private Queue _queue; - private boolean closed = false; - - public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){ - _delegate = msgProducer; - _queue = queue; - } - - public Queue getQueue() throws JMSException { - checkPreConditions(); - return _queue; - } - - public void send(Message msg) throws JMSException { - checkPreConditions(); - _delegate.send(msg); - } - - public void send(Queue queue, Message msg) throws JMSException { - checkPreConditions(queue); - _delegate.send(queue, msg); - } - - public void publish(Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(); - _delegate.send(msg, deliveryMode,priority,timeToLive); - } - - public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(queue); - _delegate.send(queue,msg, deliveryMode,priority,timeToLive); - } - - public void close() throws JMSException { - _delegate.close(); - closed = true; - } - - public int getDeliveryMode() throws JMSException { - checkPreConditions(); - return _delegate.getDeliveryMode(); - } - - public Destination getDestination() throws JMSException { - checkPreConditions(); - return _delegate.getDestination(); - } - - public boolean getDisableMessageID() throws JMSException { - checkPreConditions(); - return _delegate.getDisableMessageID(); - } - - public boolean getDisableMessageTimestamp() throws JMSException { - checkPreConditions(); - return _delegate.getDisableMessageTimestamp(); - } - - public int getPriority() throws JMSException { - checkPreConditions(); - return _delegate.getPriority(); - } - - public long getTimeToLive() throws JMSException { - checkPreConditions(); - return _delegate.getTimeToLive(); - } - - public void send(Destination dest, Message msg) throws JMSException { - checkPreConditions((Queue)dest); - _delegate.send(dest,msg); - } - - public void send(Message msg, int deliveryMode, int priority, long timeToLive) - throws JMSException { - checkPreConditions(); - _delegate.send(msg, deliveryMode,priority,timeToLive); - } - - public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException { - checkPreConditions((Queue)dest); - _delegate.send(dest,msg, deliveryMode,priority,timeToLive); - } - - public void setDeliveryMode(int deliveryMode) throws JMSException { - checkPreConditions(); - _delegate.setDeliveryMode(deliveryMode); - } - - public void setDisableMessageID(boolean disableMessageID) throws JMSException { - checkPreConditions(); - _delegate.setDisableMessageID(disableMessageID); - } - - public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { - checkPreConditions(); - _delegate.setDisableMessageTimestamp(disableMessageTimestamp); - } - - public void setPriority(int priority) throws JMSException { - checkPreConditions(); - _delegate.setPriority(priority); - } - - public void setTimeToLive(long timeToLive) throws JMSException { - checkPreConditions(); - _delegate.setTimeToLive(timeToLive); - } +public class QueueSenderAdapter implements QueueSender +{ + + private BasicMessageProducer _delegate; + private Queue _queue; + private boolean closed = false; + + public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue) + { + _delegate = msgProducer; + _queue = queue; + } + + public Queue getQueue() throws JMSException + { + checkPreConditions(); + return _queue; + } + + public void send(Message msg) throws JMSException + { + checkPreConditions(); + _delegate.send(msg); + } + + public void send(Queue queue, Message msg) throws JMSException + { + checkPreConditions(queue); + _delegate.send(queue, msg); + } + + public void publish(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + _delegate.send(msg, deliveryMode, priority, timeToLive); + } + + public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(queue); + _delegate.send(queue, msg, deliveryMode, priority, timeToLive); + } + + public void close() throws JMSException + { + _delegate.close(); + closed = true; + } + + public int getDeliveryMode() throws JMSException + { + checkPreConditions(); + return _delegate.getDeliveryMode(); + } + + public Destination getDestination() throws JMSException + { + checkPreConditions(); + return _delegate.getDestination(); + } + + public boolean getDisableMessageID() throws JMSException + { + checkPreConditions(); + return _delegate.getDisableMessageID(); + } + + public boolean getDisableMessageTimestamp() throws JMSException + { + checkPreConditions(); + return _delegate.getDisableMessageTimestamp(); + } + + public int getPriority() throws JMSException + { + checkPreConditions(); + return _delegate.getPriority(); + } + + public long getTimeToLive() throws JMSException + { + checkPreConditions(); + return _delegate.getTimeToLive(); + } + + public void send(Destination dest, Message msg) throws JMSException + { + checkPreConditions((Queue) dest); + _delegate.send(dest, msg); + } + + public void send(Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException + { + checkPreConditions(); + _delegate.send(msg, deliveryMode, priority, timeToLive); + } + + public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException + { + checkPreConditions((Queue) dest); + _delegate.send(dest, msg, deliveryMode, priority, timeToLive); + } + + public void setDeliveryMode(int deliveryMode) throws JMSException + { + checkPreConditions(); + _delegate.setDeliveryMode(deliveryMode); + } + + public void setDisableMessageID(boolean disableMessageID) throws JMSException + { + checkPreConditions(); + _delegate.setDisableMessageID(disableMessageID); + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException + { + checkPreConditions(); + _delegate.setDisableMessageTimestamp(disableMessageTimestamp); + } + + public void setPriority(int priority) throws JMSException + { + checkPreConditions(); + _delegate.setPriority(priority); + } + + public void setTimeToLive(long timeToLive) throws JMSException + { + checkPreConditions(); + _delegate.setTimeToLive(timeToLive); + } private void checkPreConditions() throws JMSException { @@ -130,31 +152,41 @@ public class QueueSenderAdapter implements QueueSender { private void checkPreConditions(Queue queue) throws JMSException { - if (closed){ - throw new javax.jms.IllegalStateException("Publisher is closed"); - } - - AMQSession session = ((BasicMessageProducer) _delegate).getSession(); - - if(session == null || session.isClosed()){ - throw new javax.jms.IllegalStateException("Invalid Session"); - } - - if(!(queue instanceof AMQDestination)) + if (closed) + { + throw new javax.jms.IllegalStateException("Publisher is closed"); + } + + AMQSession session = ((BasicMessageProducer) _delegate).getSession(); + + if (session == null || session.isClosed()) + { + throw new javax.jms.IllegalStateException("Invalid Session"); + } + + if (!(queue instanceof AMQDestination)) { throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue"); } AMQDestination destination = (AMQDestination) queue; - if(!destination.isValidated() && checkQueueBeforePublish()) + if (!destination.isValidated() && checkQueueBeforePublish()) { - if (_delegate.isBound(destination)) + if (_delegate.getSession().isStrictAMQP()) { + _delegate._logger.warn("AMQP does not support destination validation before publish, "); destination.setValidated(true); } else { - throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server"); + if (_delegate.isBound(destination)) + { + destination.setValidated(true); + } + else + { + throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server"); + } } } } -- cgit v1.2.1